This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 87965e9b9ad branch-4.0: [refactor](paimon) Per-catalog Paimon metadata
cache with two-level table+snapshot structure #60478 (#60741)
87965e9b9ad is described below
commit 87965e9b9ad2cd5d769dad434ecd39bc7a71f0a6
Author: Socrates <[email protected]>
AuthorDate: Mon Mar 9 09:29:56 2026 +0800
branch-4.0: [refactor](paimon) Per-catalog Paimon metadata cache with
two-level table+snapshot structure #60478 (#60741)
- Cherry-picked from #60478
- Keep branch-4.0 compatibility in PaimonExternalCatalog without
introducing #58894
---
.../doris/datasource/CatalogScopedCacheMgr.java | 43 +++++
.../apache/doris/datasource/ExternalCatalog.java | 1 -
.../doris/datasource/ExternalMetaCacheMgr.java | 133 ++++++--------
.../doris/datasource/ExternalSchemaCache.java | 13 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 70 +++-----
.../datasource/iceberg/IcebergMetadataCache.java | 135 +++++++-------
.../doris/datasource/iceberg/IcebergUtils.java | 14 +-
.../iceberg/cache/ContentFileEstimator.java | 194 ---------------------
.../iceberg/cache/IcebergManifestCache.java | 27 ++-
.../iceberg/cache/ManifestCacheValue.java | 20 +--
.../doris/datasource/metacache/CacheSpec.java | 152 ++++++++++++++++
.../datasource/paimon/PaimonExternalCatalog.java | 23 +++
.../datasource/paimon/PaimonExternalTable.java | 52 +++---
.../datasource/paimon/PaimonMetadataCache.java | 152 +++++++++++-----
.../datasource/paimon/PaimonMetadataCacheMgr.java | 51 ------
.../datasource/paimon/PaimonSnapshotCacheKey.java | 60 -------
.../datasource/paimon/PaimonTableCacheValue.java | 61 +++++++
.../doris/datasource/paimon/PaimonUtils.java | 64 +++++++
.../datasource/paimon/source/PaimonScanNode.java | 5 +-
.../metastore/AbstractIcebergProperties.java | 20 +++
.../org/apache/doris/regression/suite/Suite.groovy | 35 +++-
.../iceberg/test_iceberg_manifest_cache.groovy | 5 +-
.../iceberg/test_iceberg_table_cache.groovy | 2 +-
.../iceberg/test_iceberg_table_meta_cache.groovy | 18 +-
.../paimon/test_paimon_table_meta_cache.groovy | 128 ++++++++++++++
25 files changed, 856 insertions(+), 622 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
new file mode 100644
index 00000000000..6b9afc1b74a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogScopedCacheMgr.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+public class CatalogScopedCacheMgr<T> {
+ private final Map<Long, T> cacheMap = new ConcurrentHashMap<>();
+ private final Function<ExternalCatalog, T> cacheFactory;
+
+ public CatalogScopedCacheMgr(Function<ExternalCatalog, T> cacheFactory) {
+ this.cacheFactory = cacheFactory;
+ }
+
+ public T getCache(ExternalCatalog catalog) {
+ return cacheMap.computeIfAbsent(catalog.getId(), id ->
cacheFactory.apply(catalog));
+ }
+
+ public T getCache(long catalogId) {
+ return cacheMap.get(catalogId);
+ }
+
+ public T removeCache(long catalogId) {
+ return cacheMap.remove(catalogId);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 3501fcafb5e..1ccb3801b9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -1450,4 +1450,3 @@ public abstract class ExternalCatalog
}
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 8d6aa5522f1..9b44833f01a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -36,7 +36,6 @@ import
org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
-import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;
@@ -87,13 +86,10 @@ public class ExternalMetaCacheMgr {
// This executor is used to schedule the getting split tasks
private ExecutorService scheduleExecutor;
- // catalog id -> HiveMetaStoreCache
- private final Map<Long, HiveMetaStoreCache> hiveCacheMap =
Maps.newConcurrentMap();
-
- // catalog id -> IcebergMetadataCache
- private final Map<Long, IcebergMetadataCache> icebergCacheMap =
Maps.newConcurrentMap();
- // catalog id -> table schema cache
- private final Map<Long, ExternalSchemaCache> schemaCacheMap =
Maps.newHashMap();
+ private final CatalogScopedCacheMgr<HiveMetaStoreCache>
hiveMetaStoreCacheMgr;
+ private final CatalogScopedCacheMgr<IcebergMetadataCache>
icebergMetadataCacheMgr;
+ private final CatalogScopedCacheMgr<PaimonMetadataCache>
paimonMetadataCacheMgr;
+ private final CatalogScopedCacheMgr<ExternalSchemaCache> schemaCacheMgr;
// hudi partition manager
private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
@@ -101,7 +97,6 @@ public class ExternalMetaCacheMgr {
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
- private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
private final DorisExternalMetaCacheMgr dorisExternalMetaCacheMgr;
public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
@@ -132,7 +127,15 @@ public class ExternalMetaCacheMgr {
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
- paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
+ hiveMetaStoreCacheMgr = new CatalogScopedCacheMgr<>(
+ catalog -> new HiveMetaStoreCache((HMSExternalCatalog) catalog,
+ commonRefreshExecutor, fileListingExecutor));
+ icebergMetadataCacheMgr = new CatalogScopedCacheMgr<>(
+ catalog -> new IcebergMetadataCache(catalog,
commonRefreshExecutor));
+ schemaCacheMgr = new CatalogScopedCacheMgr<>(
+ catalog -> new ExternalSchemaCache(catalog,
commonRefreshExecutor));
+ paimonMetadataCacheMgr = new CatalogScopedCacheMgr<>(
+ catalog -> new PaimonMetadataCache(catalog,
commonRefreshExecutor));
dorisExternalMetaCacheMgr = new
DorisExternalMetaCacheMgr(commonRefreshExecutor);
}
@@ -160,30 +163,11 @@ public class ExternalMetaCacheMgr {
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
- HiveMetaStoreCache cache = hiveCacheMap.get(catalog.getId());
- if (cache == null) {
- synchronized (hiveCacheMap) {
- if (!hiveCacheMap.containsKey(catalog.getId())) {
- hiveCacheMap.put(catalog.getId(),
- new HiveMetaStoreCache(catalog,
commonRefreshExecutor, fileListingExecutor));
- }
- cache = hiveCacheMap.get(catalog.getId());
- }
- }
- return cache;
+ return hiveMetaStoreCacheMgr.getCache(catalog);
}
public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
- ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
- if (cache == null) {
- synchronized (schemaCacheMap) {
- if (!schemaCacheMap.containsKey(catalog.getId())) {
- schemaCacheMap.put(catalog.getId(), new
ExternalSchemaCache(catalog, commonRefreshExecutor));
- }
- cache = schemaCacheMap.get(catalog.getId());
- }
- }
- return cache;
+ return schemaCacheMgr.getCache(catalog);
}
public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog
catalog) {
@@ -203,20 +187,11 @@ public class ExternalMetaCacheMgr {
}
public IcebergMetadataCache getIcebergMetadataCache(ExternalCatalog
catalog) {
- IcebergMetadataCache cache = icebergCacheMap.get(catalog.getId());
- if (cache == null) {
- synchronized (icebergCacheMap) {
- if (!icebergCacheMap.containsKey(catalog.getId())) {
- icebergCacheMap.put(catalog.getId(), new
IcebergMetadataCache(catalog, commonRefreshExecutor));
- }
- cache = icebergCacheMap.get(catalog.getId());
- }
- }
- return cache;
+ return icebergMetadataCacheMgr.getCache(catalog);
}
- public PaimonMetadataCache getPaimonMetadataCache() {
- return paimonMetadataCacheMgr.getPaimonMetadataCache();
+ public PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog catalog)
{
+ return paimonMetadataCacheMgr.getCache(catalog);
}
public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
@@ -236,41 +211,43 @@ public class ExternalMetaCacheMgr {
}
public void removeCache(long catalogId) {
- if (hiveCacheMap.remove(catalogId) != null) {
+ if (hiveMetaStoreCacheMgr.removeCache(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);
}
- synchronized (schemaCacheMap) {
- if (schemaCacheMap.remove(catalogId) != null) {
- LOG.info("remove schema cache for catalog {}", catalogId);
- }
+ if (schemaCacheMgr.removeCache(catalogId) != null) {
+ LOG.info("remove schema cache for catalog {}", catalogId);
}
- if (icebergCacheMap.remove(catalogId) != null) {
+ if (icebergMetadataCacheMgr.removeCache(catalogId) != null) {
LOG.info("remove iceberg meta cache for catalog {}", catalogId);
}
hudiMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
- paimonMetadataCacheMgr.removeCache(catalogId);
+ PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.removeCache(catalogId);
+ if (paimonMetadataCache != null) {
+ paimonMetadataCache.invalidateCatalogCache(catalogId);
+ }
dorisExternalMetaCacheMgr.removeCache(catalogId);
}
public void invalidateTableCache(ExternalTable dorisTable) {
- synchronized (schemaCacheMap) {
- ExternalSchemaCache schemaCache =
schemaCacheMap.get(dorisTable.getCatalog().getId());
- if (schemaCache != null) {
- schemaCache.invalidateTableCache(dorisTable);
- }
+ ExternalSchemaCache schemaCache =
schemaCacheMgr.getCache(dorisTable.getCatalog().getId());
+ if (schemaCache != null) {
+ schemaCache.invalidateTableCache(dorisTable);
}
- HiveMetaStoreCache hiveMetaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
+ HiveMetaStoreCache hiveMetaCache =
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
if (hiveMetaCache != null) {
hiveMetaCache.invalidateTableCache(dorisTable.getOrBuildNameMapping());
}
- IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(dorisTable.getCatalog().getId());
+ IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateTableCache(dorisTable);
}
hudiMetadataCacheMgr.invalidateTableCache(dorisTable);
maxComputeMetadataCacheMgr.invalidateTableCache(dorisTable);
- paimonMetadataCacheMgr.invalidateTableCache(dorisTable);
+ PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(dorisTable.getCatalog().getId());
+ if (paimonMetadataCache != null) {
+ paimonMetadataCache.invalidateTableCache(dorisTable);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("invalid table cache for {}.{} in catalog {}",
dorisTable.getRemoteDbName(),
dorisTable.getRemoteName(),
dorisTable.getCatalog().getName());
@@ -279,43 +256,45 @@ public class ExternalMetaCacheMgr {
public void invalidateDbCache(long catalogId, String dbName) {
dbName = ClusterNamespace.getNameFromFullName(dbName);
- synchronized (schemaCacheMap) {
- ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
- if (schemaCache != null) {
- schemaCache.invalidateDbCache(dbName);
- }
+ ExternalSchemaCache schemaCache = schemaCacheMgr.getCache(catalogId);
+ if (schemaCache != null) {
+ schemaCache.invalidateDbCache(dbName);
}
- HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
- IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(catalogId);
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateDbCache(catalogId, dbName);
}
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
- paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+ PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(catalogId);
+ if (paimonMetadataCache != null) {
+ paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("invalid db cache for {} in catalog {}", dbName,
catalogId);
}
}
public void invalidateCatalogCache(long catalogId) {
- synchronized (schemaCacheMap) {
- schemaCacheMap.remove(catalogId);
- }
- HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+ schemaCacheMgr.removeCache(catalogId);
+ HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.invalidateAll();
}
- IcebergMetadataCache icebergMetadataCache =
icebergCacheMap.get(catalogId);
+ IcebergMetadataCache icebergMetadataCache =
icebergMetadataCacheMgr.getCache(catalogId);
if (icebergMetadataCache != null) {
icebergMetadataCache.invalidateCatalogCache(catalogId);
}
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
- paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
+ PaimonMetadataCache paimonMetadataCache =
paimonMetadataCacheMgr.getCache(catalogId);
+ if (paimonMetadataCache != null) {
+ paimonMetadataCache.invalidateCatalogCache(catalogId);
+ }
dorisExternalMetaCacheMgr.invalidateCatalogCache(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
@@ -323,14 +302,12 @@ public class ExternalMetaCacheMgr {
}
public void invalidSchemaCache(long catalogId) {
- synchronized (schemaCacheMap) {
- schemaCacheMap.remove(catalogId);
- }
+ schemaCacheMgr.removeCache(catalogId);
}
public void addPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
@@ -348,7 +325,7 @@ public class ExternalMetaCacheMgr {
public void dropPartitionsCache(long catalogId, HMSExternalTable table,
List<String> partitionNames) {
String dbName =
ClusterNamespace.getNameFromFullName(table.getDbName());
- HiveMetaStoreCache metaCache = hiveCacheMap.get(catalogId);
+ HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(catalogId);
if (metaCache != null) {
metaCache.dropPartitionsCache(table, partitionNames, true);
}
@@ -358,7 +335,7 @@ public class ExternalMetaCacheMgr {
}
public void invalidatePartitionsCache(ExternalTable dorisTable,
List<String> partitionNames) {
- HiveMetaStoreCache metaCache =
hiveCacheMap.get(dorisTable.getCatalog().getId());
+ HiveMetaStoreCache metaCache =
hiveMetaStoreCacheMgr.getCache(dorisTable.getCatalog().getId());
if (metaCache != null) {
for (String partitionName : partitionNames) {
metaCache.invalidatePartitionCache(dorisTable, partitionName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a1c0236eeb4..cf129ea8623 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
@@ -26,7 +27,6 @@ import org.apache.doris.metric.MetricRepo;
import com.github.benmanes.caffeine.cache.LoadingCache;
import lombok.Data;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -49,13 +49,14 @@ public class ExternalSchemaCache {
}
private void init(ExecutorService executor) {
- long schemaCacheTtlSecond = NumberUtils.toLong(
-
(catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND)),
ExternalCatalog.CACHE_NO_TTL);
+ CacheSpec cacheSpec = CacheSpec.fromTtlValue(
+
catalog.getProperties().get(ExternalCatalog.SCHEMA_CACHE_TTL_SECOND),
+ Config.external_cache_expire_time_seconds_after_access,
+ Config.max_external_schema_cache_num);
CacheFactory schemaCacheFactory = new CacheFactory(
- OptionalLong.of(schemaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
- ? schemaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
+ CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()),
OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
- Config.max_external_schema_cache_num,
+ cacheSpec.getCapacity(),
false,
null);
schemaCache = schemaCacheFactory.buildCache(this::loadSchema,
executor);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 039bba4ed0f..ee8ad8b4fc0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -28,11 +28,11 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import
org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
import org.apache.doris.transaction.TransactionManagerFactory;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.catalog.Catalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,12 +52,14 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_DLF = "dlf";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
- public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND =
"iceberg.table.meta.cache.ttl-second";
- public static final String ICEBERG_MANIFEST_CACHE_ENABLE =
"iceberg.manifest.cache.enable";
- public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB =
"iceberg.manifest.cache.capacity-mb";
- public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND =
"iceberg.manifest.cache.ttl-second";
+ public static final String ICEBERG_TABLE_CACHE_ENABLE =
"meta.cache.iceberg.table.enable";
+ public static final String ICEBERG_TABLE_CACHE_TTL_SECOND =
"meta.cache.iceberg.table.ttl-second";
+ public static final String ICEBERG_TABLE_CACHE_CAPACITY =
"meta.cache.iceberg.table.capacity";
+ public static final String ICEBERG_MANIFEST_CACHE_ENABLE =
"meta.cache.iceberg.manifest.enable";
+ public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND =
"meta.cache.iceberg.manifest.ttl-second";
+ public static final String ICEBERG_MANIFEST_CACHE_CAPACITY =
"meta.cache.iceberg.manifest.capacity";
public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = false;
- public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
+ public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY = 1024;
public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 *
60 * 60;
protected String icebergCatalogType;
protected Catalog catalog;
@@ -86,51 +88,33 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
@Override
public void checkProperties() throws DdlException {
super.checkProperties();
- // check iceberg.table.meta.cache.ttl-second parameter
- String tableMetaCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(tableMetaCacheTtlSecond) &&
NumberUtils.toInt(tableMetaCacheTtlSecond, CACHE_NO_TTL)
- < CACHE_TTL_DISABLE_CACHE) {
- throw new DdlException(
- "The parameter " + ICEBERG_TABLE_META_CACHE_TTL_SECOND + "
is wrong, value is "
- + tableMetaCacheTtlSecond);
- }
-
- String manifestCacheEnable =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
- if (Objects.nonNull(manifestCacheEnable)
- && !(manifestCacheEnable.equalsIgnoreCase("true") ||
manifestCacheEnable.equalsIgnoreCase("false"))) {
- throw new DdlException(
- "The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is
wrong, value is "
- + manifestCacheEnable);
- }
-
- String manifestCacheCapacity =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
- if (Objects.nonNull(manifestCacheCapacity) &&
NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
- throw new DdlException(
- "The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + "
is wrong, value is "
- + manifestCacheCapacity);
- }
-
- String manifestCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(manifestCacheTtlSecond)
- && NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) <
CACHE_TTL_DISABLE_CACHE) {
- throw new DdlException(
- "The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + "
is wrong, value is "
- + manifestCacheTtlSecond);
- }
+
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE,
null),
+ ICEBERG_TABLE_CACHE_ENABLE);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND,
null),
+ -1L, ICEBERG_TABLE_CACHE_TTL_SECOND);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY,
null),
+ 0L, ICEBERG_TABLE_CACHE_CAPACITY);
+
+
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE,
null),
+ ICEBERG_MANIFEST_CACHE_ENABLE);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND,
null),
+ -1L, ICEBERG_MANIFEST_CACHE_TTL_SECOND);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY,
null),
+ 0L, ICEBERG_MANIFEST_CACHE_CAPACITY);
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}
@Override
public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
super.notifyPropertiesUpdated(updatedProps);
- String tableMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(tableMetaCacheTtl)) {
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
- }
+ String tableCacheEnable =
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_ENABLE, null);
+ String tableCacheTtl =
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_TTL_SECOND, null);
+ String tableCacheCapacity =
updatedProps.getOrDefault(ICEBERG_TABLE_CACHE_CAPACITY, null);
String manifestCacheEnable =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
- String manifestCacheCapacity =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+ String manifestCacheCapacity =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY, null);
String manifestCacheTtl =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(manifestCacheEnable) ||
Objects.nonNull(manifestCacheCapacity)
+ if (Objects.nonNull(tableCacheEnable) ||
Objects.nonNull(tableCacheTtl) || Objects.nonNull(tableCacheCapacity)
+ || Objects.nonNull(manifestCacheEnable) ||
Objects.nonNull(manifestCacheCapacity)
|| Objects.nonNull(manifestCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 5f0c0700efe..40c8ba29184 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -35,7 +36,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -65,28 +65,38 @@ public class IcebergMetadataCache {
}
public void init() {
- long tableMetaCacheTtlSecond = NumberUtils.toLong(
-
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND),
- ExternalCatalog.CACHE_NO_TTL);
-
+ CacheSpec tableCacheSpec = resolveTableCacheSpec();
CacheFactory tableCacheFactory = new CacheFactory(
- OptionalLong.of(tableMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
- ? tableMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
- OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
- Config.max_external_table_cache_num,
+ CacheSpec.toExpireAfterAccess(tableCacheSpec.getTtlSecond()),
+ OptionalLong.empty(),
+ tableCacheSpec.getCapacity(),
true,
null);
this.tableCache =
tableCacheFactory.buildCache(this::loadTableCacheValue, executor);
this.viewCache = tableCacheFactory.buildCache(this::loadView,
executor);
- long manifestCacheCapacityMb = NumberUtils.toLong(
-
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
-
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
- manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
- long manifestCacheTtlSec = NumberUtils.toLong(
-
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
-
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
- this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb,
manifestCacheTtlSec);
+ CacheSpec manifestCacheSpec = resolveManifestCacheSpec();
+ this.manifestCache = new
IcebergManifestCache(manifestCacheSpec.getCapacity(),
+ manifestCacheSpec.getTtlSecond());
+ }
+
+ private CacheSpec resolveTableCacheSpec() {
+ return CacheSpec.fromProperties(catalog.getProperties(),
+ IcebergExternalCatalog.ICEBERG_TABLE_CACHE_ENABLE, true,
+ IcebergExternalCatalog.ICEBERG_TABLE_CACHE_TTL_SECOND,
+ Config.external_cache_expire_time_seconds_after_access,
+ IcebergExternalCatalog.ICEBERG_TABLE_CACHE_CAPACITY,
+ Config.max_external_table_cache_num);
+ }
+
+ private CacheSpec resolveManifestCacheSpec() {
+ return CacheSpec.fromProperties(catalog.getProperties(),
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+ IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
}
public Table getIcebergTable(ExternalTable dorisTable) {
@@ -169,74 +179,61 @@ public class IcebergMetadataCache {
}
public void invalidateCatalogCache(long catalogId) {
- tableCache.asMap().entrySet().stream()
- .filter(entry -> entry.getKey().nameMapping.getCtlId() ==
catalogId)
- .forEach(entry -> {
-
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
- if (LOG.isDebugEnabled()) {
- LOG.info("invalidate iceberg table cache {} when
invalidating catalog cache",
- entry.getKey().nameMapping, new Exception());
- }
- tableCache.invalidate(entry.getKey());
- });
-
- viewCache.asMap().entrySet().stream()
- .filter(entry -> entry.getKey().nameMapping.getCtlId() ==
catalogId)
- .forEach(entry -> viewCache.invalidate(entry.getKey()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("invalidate all iceberg table cache when invalidating
catalog {}", catalogId);
+ }
+ // Invalidate all entries related to the catalog
+ tableCache.invalidateAll();
+ viewCache.invalidateAll();
manifestCache.invalidateAll();
}
public void invalidateTableCache(ExternalTable dorisTable) {
- long catalogId = dorisTable.getCatalog().getId();
+ IcebergMetadataCacheKey key =
IcebergMetadataCacheKey.of(dorisTable.getOrBuildNameMapping());
+ IcebergTableCacheValue tableCacheValue = tableCache.getIfPresent(key);
+ if (tableCacheValue != null) {
+ invalidateTableCache(key, tableCacheValue);
+ } else {
+ invalidateTableCacheByLocalName(dorisTable);
+ }
+ }
+
+ private void invalidateTableCache(IcebergMetadataCacheKey key,
IcebergTableCacheValue tableCacheValue) {
+ ManifestFiles.dropCache(tableCacheValue.getIcebergTable().io());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("invalidate iceberg table cache {}", key.nameMapping,
new Exception());
+ }
+ tableCache.invalidate(key);
+ viewCache.invalidate(key);
+ }
+
+ private void invalidateTableCacheByLocalName(ExternalTable dorisTable) {
String dbName = dorisTable.getDbName();
String tblName = dorisTable.getName();
tableCache.asMap().entrySet().stream()
- .filter(entry -> {
- IcebergMetadataCacheKey key = entry.getKey();
- return key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName)
- &&
key.nameMapping.getLocalTblName().equals(tblName);
- })
- .forEach(entry -> {
-
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
- if (LOG.isDebugEnabled()) {
- LOG.info("invalidate iceberg table cache {}",
- entry.getKey().nameMapping, new Exception());
- }
- tableCache.invalidate(entry.getKey());
- });
- viewCache.asMap().entrySet().stream()
- .filter(entry -> {
- IcebergMetadataCacheKey key = entry.getKey();
- return key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName)
- &&
key.nameMapping.getLocalTblName().equals(tblName);
- })
- .forEach(entry -> viewCache.invalidate(entry.getKey()));
+ .filter(entry ->
entry.getKey().nameMapping.getLocalDbName().equals(dbName)
+ &&
entry.getKey().nameMapping.getLocalTblName().equals(tblName))
+ .forEach(entry -> invalidateTableCache(entry.getKey(),
entry.getValue()));
+ viewCache.asMap().keySet().stream()
+ .filter(key -> key.nameMapping.getLocalDbName().equals(dbName)
+ && key.nameMapping.getLocalTblName().equals(tblName))
+ .forEach(viewCache::invalidate);
}
public void invalidateDbCache(long catalogId, String dbName) {
tableCache.asMap().entrySet().stream()
- .filter(entry -> {
- IcebergMetadataCacheKey key = entry.getKey();
- return key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName);
- })
+ .filter(entry ->
entry.getKey().nameMapping.getLocalDbName().equals(dbName))
.forEach(entry -> {
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
if (LOG.isDebugEnabled()) {
- LOG.info("invalidate iceberg table cache {} when
invalidating db cache",
+ LOG.debug("invalidate iceberg table cache {} when
invalidating db cache",
entry.getKey().nameMapping, new Exception());
}
tableCache.invalidate(entry.getKey());
});
- viewCache.asMap().entrySet().stream()
- .filter(entry -> {
- IcebergMetadataCacheKey key = entry.getKey();
- return key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName);
- })
- .forEach(entry -> viewCache.invalidate(entry.getKey()));
+ viewCache.asMap().keySet().stream()
+ .filter(key -> key.nameMapping.getLocalDbName().equals(dbName))
+ .forEach(viewCache::invalidate);
}
private static void initIcebergTableFileIO(Table table, Map<String,
String> props) {
@@ -260,6 +257,10 @@ public class IcebergMetadataCache {
this.nameMapping = nameMapping;
}
+ private static IcebergMetadataCacheKey of(NameMapping nameMapping) {
+ return new IcebergMetadataCacheKey(nameMapping);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index e14a79cf3ea..75db779d157 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -58,6 +58,7 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
@@ -1588,11 +1589,14 @@ public class IcebergUtils {
}
public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
- String enabled =
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
- if (enabled == null) {
- return
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
- }
- return Boolean.parseBoolean(enabled);
+ CacheSpec spec = CacheSpec.fromProperties(catalog.getProperties(),
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+ IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
+ return CacheSpec.isCacheEnabled(spec.isEnable(), spec.getTtlSecond(),
spec.getCapacity());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
deleted file mode 100644
index 112f161389b..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg.cache;
-
-import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.StructLike;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
- */
-public final class ContentFileEstimator {
- private static final long LIST_BASE_WEIGHT = 48L;
- private static final long OBJECT_REFERENCE_WEIGHT = 8L;
- private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
- private static final long STRING_BASE_WEIGHT = 40L;
- private static final long CHAR_BYTES = 2L;
- private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
- private static final long MAP_BASE_WEIGHT = 48L;
- private static final long MAP_ENTRY_OVERHEAD = 24L;
- private static final long LONG_OBJECT_WEIGHT = 24L;
- private static final long INT_OBJECT_WEIGHT = 16L;
- private static final long PARTITION_BASE_WEIGHT = 48L;
- private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
-
- private ContentFileEstimator() {
- }
-
- public static long estimate(List<? extends ContentFile<?>> files) {
- return listReferenceWeight(files) + estimateContentFilesWeight(files);
- }
-
- private static long listReferenceWeight(List<?> files) {
- if (files == null || files.isEmpty()) {
- return 0L;
- }
- return LIST_BASE_WEIGHT + (long) files.size() *
OBJECT_REFERENCE_WEIGHT;
- }
-
- private static long estimateContentFilesWeight(List<? extends
ContentFile<?>> files) {
- long total = 0L;
- if (files == null) {
- return 0L;
- }
- for (ContentFile<?> file : files) {
- total += estimateContentFileWeight(file);
- }
- return total;
- }
-
- private static long estimateContentFileWeight(ContentFile<?> file) {
- if (file == null) {
- return 0L;
- }
-
- long weight = CONTENT_FILE_BASE_WEIGHT;
- weight += charSequenceWeight(file.path());
- weight += stringWeight(file.manifestLocation());
- weight += byteBufferWeight(file.keyMetadata());
- weight += partitionWeight(file.partition());
-
- weight += numericMapWeight(file.columnSizes());
- weight += numericMapWeight(file.valueCounts());
- weight += numericMapWeight(file.nullValueCounts());
- weight += numericMapWeight(file.nanValueCounts());
- weight += byteBufferMapWeight(file.lowerBounds());
- weight += byteBufferMapWeight(file.upperBounds());
-
- weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
- weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
-
- weight += optionalLongWeight(file.pos());
- weight += optionalLongWeight(file.dataSequenceNumber());
- weight += optionalLongWeight(file.fileSequenceNumber());
- weight += optionalLongWeight(file.firstRowId());
- weight += optionalIntWeight(file.sortOrderId());
-
- if (file instanceof DeleteFile) {
- DeleteFile deleteFile = (DeleteFile) file;
- weight += stringWeight(deleteFile.referencedDataFile());
- weight += optionalLongWeight(deleteFile.contentOffset());
- weight += optionalLongWeight(deleteFile.contentSizeInBytes());
- }
-
- return weight;
- }
-
- private static long listWeight(List<? extends Number> list, long
elementWeight) {
- if (list == null || list.isEmpty()) {
- return 0L;
- }
- return LIST_BASE_WEIGHT + (long) list.size() *
(OBJECT_REFERENCE_WEIGHT + elementWeight);
- }
-
- private static long numericMapWeight(Map<Integer, Long> map) {
- if (map == null || map.isEmpty()) {
- return 0L;
- }
- return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD +
LONG_OBJECT_WEIGHT);
- }
-
- private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
- if (map == null || map.isEmpty()) {
- return 0L;
- }
- long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
- for (ByteBuffer buffer : map.values()) {
- weight += byteBufferWeight(buffer);
- }
- return weight;
- }
-
- private static long partitionWeight(StructLike partition) {
- if (partition == null) {
- return 0L;
- }
- long weight = PARTITION_BASE_WEIGHT + (long) partition.size() *
PARTITION_VALUE_BASE_WEIGHT;
- for (int i = 0; i < partition.size(); i++) {
- Object value = partition.get(i, Object.class);
- weight += estimateValueWeight(value);
- }
- return weight;
- }
-
- private static long estimateValueWeight(Object value) {
- if (value == null) {
- return 0L;
- }
- if (value instanceof CharSequence) {
- return charSequenceWeight((CharSequence) value);
- } else if (value instanceof byte[]) {
- return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
- } else if (value instanceof ByteBuffer) {
- return byteBufferWeight((ByteBuffer) value);
- } else if (value instanceof Long || value instanceof Double) {
- return LONG_OBJECT_WEIGHT;
- } else if (value instanceof Integer || value instanceof Float) {
- return INT_OBJECT_WEIGHT;
- } else if (value instanceof Short || value instanceof Character) {
- return 4L;
- } else if (value instanceof Boolean) {
- return 1L;
- }
- return OBJECT_REFERENCE_WEIGHT;
- }
-
- private static long charSequenceWeight(CharSequence value) {
- if (value == null) {
- return 0L;
- }
- return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
- }
-
- private static long stringWeight(String value) {
- if (value == null) {
- return 0L;
- }
- return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
- }
-
- private static long byteBufferWeight(ByteBuffer buffer) {
- if (buffer == null) {
- return 0L;
- }
- return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
- }
-
- private static long optionalLongWeight(Long value) {
- return value == null ? 0L : LONG_OBJECT_WEIGHT;
- }
-
- private static long optionalIntWeight(Integer value) {
- return value == null ? 0L : INT_OBJECT_WEIGHT;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
index 6c5d79ecb69..6016b2ab7e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
@@ -17,16 +17,15 @@
package org.apache.doris.datasource.iceberg.cache;
+import org.apache.doris.common.CacheFactory;
import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.metacache.CacheSpec;
import com.github.benmanes.caffeine.cache.CacheLoader;
-import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.github.benmanes.caffeine.cache.Weigher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
@@ -38,20 +37,14 @@ public class IcebergManifestCache {
private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
- public IcebergManifestCache(long capacityMb, long ttlSec) {
- long capacityInBytes = capacityMb * 1024L * 1024L;
- Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value)
-> {
- long weight =
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
- if (weight > Integer.MAX_VALUE) {
- return Integer.MAX_VALUE;
- }
- return (int) weight;
- };
- Caffeine<ManifestCacheKey, ManifestCacheValue> builder =
Caffeine.newBuilder()
- .maximumWeight(capacityInBytes)
- .weigher(weigher)
- .expireAfterAccess(Duration.ofSeconds(ttlSec));
- cache = builder.build(new CacheLoader<ManifestCacheKey,
ManifestCacheValue>() {
+ public IcebergManifestCache(long capacity, long ttlSec) {
+ CacheFactory cacheFactory = new CacheFactory(
+ CacheSpec.toExpireAfterAccess(ttlSec),
+ java.util.OptionalLong.empty(),
+ capacity,
+ true,
+ null);
+ cache = cacheFactory.buildCache(new CacheLoader<ManifestCacheKey,
ManifestCacheValue>() {
@Override
public ManifestCacheValue load(ManifestCacheKey key) {
throw new CacheException("Manifest cache loader should be
provided explicitly for key %s", null, key);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
index 91e2f6db72f..e98ca6b2fb2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
@@ -24,27 +24,23 @@ import java.util.Collections;
import java.util.List;
/**
- * Cached manifest payload containing parsed files and an estimated weight.
+ * Cached manifest payload containing parsed files.
*/
public class ManifestCacheValue {
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
- private final long weightBytes;
- private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles, long weightBytes) {
+ private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles) {
this.dataFiles = dataFiles == null ? Collections.emptyList() :
dataFiles;
this.deleteFiles = deleteFiles == null ? Collections.emptyList() :
deleteFiles;
- this.weightBytes = weightBytes;
}
public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
- return new ManifestCacheValue(dataFiles, Collections.emptyList(),
- estimateWeight(dataFiles, Collections.emptyList()));
+ return new ManifestCacheValue(dataFiles, Collections.emptyList());
}
public static ManifestCacheValue forDeleteFiles(List<DeleteFile>
deleteFiles) {
- return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
- estimateWeight(Collections.emptyList(), deleteFiles));
+ return new ManifestCacheValue(Collections.emptyList(), deleteFiles);
}
public List<DataFile> getDataFiles() {
@@ -54,12 +50,4 @@ public class ManifestCacheValue {
public List<DeleteFile> getDeleteFiles() {
return deleteFiles;
}
-
- public long getWeightBytes() {
- return weightBytes;
- }
-
- private static long estimateWeight(List<DataFile> dataFiles,
List<DeleteFile> deleteFiles) {
- return ContentFileEstimator.estimate(dataFiles) +
ContentFileEstimator.estimate(deleteFiles);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java
new file mode 100644
index 00000000000..ca0f1be330d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/CacheSpec.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.metacache;
+
+import org.apache.doris.common.DdlException;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+import java.util.Map;
+import java.util.OptionalLong;
+
+/**
+ * Common cache specification for external metadata caches.
+ *
+ * <p>Semantics:
+ * <ul>
+ * <li>enable=false disables cache</li>
+ * <li>ttlSecond=0 disables cache, ttlSecond=-1 means no expiration</li>
+ * <li>capacity=0 disables cache; capacity is count-based</li>
+ * </ul>
+ */
+public final class CacheSpec {
+ public static final long CACHE_NO_TTL = -1L;
+ public static final long CACHE_TTL_DISABLE_CACHE = 0L;
+
+ private final boolean enable;
+ private final long ttlSecond;
+ private final long capacity;
+
+ private CacheSpec(boolean enable, long ttlSecond, long capacity) {
+ this.enable = enable;
+ this.ttlSecond = ttlSecond;
+ this.capacity = capacity;
+ }
+
+ public static CacheSpec fromProperties(Map<String, String> properties,
+ String enableKey, boolean defaultEnable,
+ String ttlKey, long defaultTtlSecond,
+ String capacityKey, long defaultCapacity) {
+ boolean enable = getBooleanProperty(properties, enableKey,
defaultEnable);
+ long ttlSecond = getLongProperty(properties, ttlKey, defaultTtlSecond);
+ long capacity = getLongProperty(properties, capacityKey,
defaultCapacity);
+ if (!isCacheEnabled(enable, ttlSecond, capacity)) {
+ capacity = 0;
+ }
+ return new CacheSpec(enable, ttlSecond, capacity);
+ }
+
+ /**
+ * Build a cache spec from a ttl property value and fixed capacity.
+ *
+ * <p>Semantics are compatible with legacy schema cache behavior:
+ * <ul>
+ * <li>ttlValue is null: use default ttl</li>
+ * <li>ttl=-1: no expiration</li>
+ * <li>ttl=0: disable cache by forcing capacity=0</li>
+ * <li>ttl parse failure: fallback to -1 (no expiration)</li>
+ * </ul>
+ * TODO: Refactor schema cache and its parameters to the unified
enable/ttl/capacity model,
+ * then remove this ttl-only adapter.
+ */
+ public static CacheSpec fromTtlValue(String ttlValue, long
defaultTtlSecond, long defaultCapacity) {
+ long ttlSecond = ttlValue == null ? defaultTtlSecond :
NumberUtils.toLong(ttlValue, CACHE_NO_TTL);
+ long capacity = defaultCapacity;
+ if (!isCacheEnabled(true, ttlSecond, capacity)) {
+ capacity = 0;
+ }
+ return new CacheSpec(true, ttlSecond, capacity);
+ }
+
+ public static void checkBooleanProperty(String value, String key) throws
DdlException {
+ if (value == null) {
+ return;
+ }
+ if (!value.equalsIgnoreCase("true") &&
!value.equalsIgnoreCase("false")) {
+ throw new DdlException("The parameter " + key + " is wrong, value
is " + value);
+ }
+ }
+
+ public static void checkLongProperty(String value, long minValue, String
key) throws DdlException {
+ if (value == null) {
+ return;
+ }
+ long parsed;
+ try {
+ parsed = Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ throw new DdlException("The parameter " + key + " is wrong, value
is " + value);
+ }
+ if (parsed < minValue) {
+ throw new DdlException("The parameter " + key + " is wrong, value
is " + value);
+ }
+ }
+
+ public static boolean isCacheEnabled(boolean enable, long ttlSecond, long
capacity) {
+ return enable && ttlSecond != 0 && capacity != 0;
+ }
+
+ /**
+ * Convert ttlSecond to OptionalLong for CacheFactory.
+ * ttlSecond=-1 means no expiration; ttlSecond=0 disables cache.
+ */
+ public static OptionalLong toExpireAfterAccess(long ttlSecond) {
+ if (ttlSecond == CACHE_NO_TTL) {
+ return OptionalLong.empty();
+ }
+ return OptionalLong.of(Math.max(ttlSecond, CACHE_TTL_DISABLE_CACHE));
+ }
+
+ private static boolean getBooleanProperty(Map<String, String> properties,
String key, boolean defaultValue) {
+ String value = properties.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
+ private static long getLongProperty(Map<String, String> properties, String
key, long defaultValue) {
+ String value = properties.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return NumberUtils.toLong(value, defaultValue);
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public long getTtlSecond() {
+ return ttlSecond;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 2dd3c0c8c6b..b6a06fd4670 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -36,6 +37,7 @@ import org.apache.paimon.partition.Partition;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
// The subclasses of this class are all deprecated, only for meta persistence
compatibility.
public class PaimonExternalCatalog extends ExternalCatalog {
@@ -45,6 +47,9 @@ public class PaimonExternalCatalog extends ExternalCatalog {
public static final String PAIMON_HMS = "hms";
public static final String PAIMON_DLF = "dlf";
public static final String PAIMON_REST = "rest";
+ public static final String PAIMON_TABLE_CACHE_ENABLE =
"meta.cache.paimon.table.enable";
+ public static final String PAIMON_TABLE_CACHE_TTL_SECOND =
"meta.cache.paimon.table.ttl-second";
+ public static final String PAIMON_TABLE_CACHE_CAPACITY =
"meta.cache.paimon.table.capacity";
protected String catalogType;
protected Catalog catalog;
@@ -188,9 +193,27 @@ public class PaimonExternalCatalog extends ExternalCatalog
{
@Override
public void checkProperties() throws DdlException {
super.checkProperties();
+
CacheSpec.checkBooleanProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_ENABLE,
null),
+ PAIMON_TABLE_CACHE_ENABLE);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND,
null),
+ -1L, PAIMON_TABLE_CACHE_TTL_SECOND);
+
CacheSpec.checkLongProperty(catalogProperty.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY,
null),
+ 0L, PAIMON_TABLE_CACHE_CAPACITY);
catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class);
}
+ @Override
+ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
+ super.notifyPropertiesUpdated(updatedProps);
+ String tableCacheEnable =
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_ENABLE, null);
+ String tableCacheTtl =
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_TTL_SECOND, null);
+ String tableCacheCapacity =
updatedProps.getOrDefault(PAIMON_TABLE_CACHE_CAPACITY, null);
+ if (Objects.nonNull(tableCacheEnable) || Objects.nonNull(tableCacheTtl)
+ || Objects.nonNull(tableCacheCapacity)) {
+ PaimonUtils.getPaimonMetadataCache(this).init();
+ }
+ }
+
@Override
public void onClose() {
super.onClose();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 174bfa64a2d..1782bae59b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -20,7 +20,6 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
@@ -76,12 +75,9 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
private static final Logger LOG =
LogManager.getLogger(PaimonExternalTable.class);
- private final Table paimonTable;
-
public PaimonExternalTable(long id, String name, String remoteName,
PaimonExternalCatalog catalog,
PaimonExternalDatabase db) {
super(id, name, remoteName, catalog, db,
TableType.PAIMON_EXTERNAL_TABLE);
- this.paimonTable = catalog.getPaimonTable(getOrBuildNameMapping());
}
public String getPaimonCatalogType() {
@@ -96,7 +92,13 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
}
public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
- return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
+ if (snapshot.isPresent()) {
+ // MTMV scenario: get from snapshot cache
+ return
getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable();
+ } else {
+ // Normal query scenario: get directly from table cache
+ return PaimonUtils.getPaimonTable(this);
+ }
}
private PaimonSnapshotCacheValue
getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
@@ -109,7 +111,8 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
// use the specified snapshot and the corresponding schema(not the
latest
// schema).
try {
- DataTable dataTable = (DataTable) paimonTable;
+ Table baseTable = getBasePaimonTable();
+ DataTable dataTable = (DataTable) baseTable;
Snapshot snapshot;
Map<String, String> scanOptions = new HashMap<>();
@@ -135,18 +138,19 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
scanOptions.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
}
- Table scanTable = paimonTable.copy(scanOptions);
+ Table scanTable = baseTable.copy(scanOptions);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(snapshot.id(), snapshot.schemaId(),
scanTable));
} catch (Exception e) {
- LOG.warn("Failed to get Paimon snapshot for table {}",
paimonTable.name(), e);
+ LOG.warn("Failed to get Paimon snapshot for table {}",
getOrBuildNameMapping().getFullLocalName(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() ==
null ? "unknown cause" : e.getMessage()),
e);
}
} else if (scanParams.isPresent() && scanParams.get().isBranch()) {
try {
- String branch =
PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
+ Table baseTable = getBasePaimonTable();
+ String branch =
PaimonUtil.resolvePaimonBranch(scanParams.get(), baseTable);
Table table = ((PaimonExternalCatalog)
catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
@@ -160,15 +164,14 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(latestSnapshotId, schemaId,
dataTable));
} catch (Exception e) {
- LOG.warn("Failed to get Paimon branch for table {}",
paimonTable.name(), e);
+ LOG.warn("Failed to get Paimon branch for table {}",
getOrBuildNameMapping().getFullLocalName(), e);
throw new RuntimeException(
"Failed to get Paimon branch: " + (e.getMessage() ==
null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
- .getPaimonSnapshot(this);
+ return PaimonUtils.getLatestSnapshotCacheValue(this);
}
}
@@ -201,7 +204,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
public long fetchRowCount() {
makeSureInitialized();
long rowCount = 0;
- List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
+ List<Split> splits =
getBasePaimonTable().newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
@@ -320,7 +323,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
makeSureInitialized();
PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
try {
- Table table = ((PaimonExternalCatalog)
getCatalog()).getPaimonTable(getOrBuildNameMapping());
+ Table table = getBasePaimonTable();
TableSchema tableSchema = ((DataTable)
table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId());
List<DataField> columns = tableSchema.fields();
List<Column> dorisColumns =
Lists.newArrayListWithCapacity(columns.size());
@@ -353,15 +356,15 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
private PaimonSchemaCacheValue
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
PaimonSnapshotCacheValue snapshotCacheValue =
getOrFetchSnapshotCacheValue(snapshot);
- return
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
- .getPaimonSchemaCacheValue(getOrBuildNameMapping(),
snapshotCacheValue.getSnapshot().getSchemaId());
+ return PaimonUtils.getSchemaCacheValue(this, snapshotCacheValue);
}
private PaimonSnapshotCacheValue
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
if (snapshot.isPresent()) {
return ((PaimonMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
} else {
- return getPaimonSnapshotCacheValue(Optional.empty(),
Optional.empty());
+ // Use new lazy-loading snapshot cache API
+ return PaimonUtils.getSnapshotCacheValue(snapshot, this);
}
}
@@ -373,13 +376,14 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
@Override
public String getComment() {
- return paimonTable.comment().isPresent() ? paimonTable.comment().get()
: "";
+ Table table = getBasePaimonTable();
+ return table.comment().isPresent() ? table.comment().get() : "";
}
public Map<String, String> getTableProperties() {
-
- if (paimonTable instanceof DataTable) {
- DataTable dataTable = (DataTable) paimonTable;
+ Table table = getBasePaimonTable();
+ if (table instanceof DataTable) {
+ DataTable dataTable = (DataTable) table;
Map<String, String> properties = new
LinkedHashMap<>(dataTable.coreOptions().toMap());
if (!dataTable.primaryKeys().isEmpty()) {
@@ -395,6 +399,10 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
@Override
public boolean isPartitionedTable() {
makeSureInitialized();
- return !paimonTable.partitionKeys().isEmpty();
+ return !getBasePaimonTable().partitionKeys().isEmpty();
+ }
+
+ private Table getBasePaimonTable() {
+ return PaimonUtils.getPaimonTable(this);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index fc58a7f15eb..7f118490fdd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -29,6 +29,7 @@ import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.metacache.CacheSpec;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Maps;
@@ -51,31 +52,65 @@ import java.util.concurrent.ExecutorService;
public class PaimonMetadataCache {
- private final LoadingCache<PaimonSnapshotCacheKey,
PaimonSnapshotCacheValue> snapshotCache;
+ private final ExecutorService executor;
+ private final ExternalCatalog catalog;
+ private LoadingCache<PaimonTableCacheKey, PaimonTableCacheValue>
tableCache;
- public PaimonMetadataCache(ExecutorService executor) {
- CacheFactory snapshotCacheFactory = new CacheFactory(
-
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
- OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
- Config.max_external_table_cache_num,
+ public PaimonMetadataCache(ExternalCatalog catalog, ExecutorService
executor) {
+ this.executor = executor;
+ this.catalog = catalog;
+ init();
+ }
+
+ public void init() {
+ CacheSpec cacheSpec = resolveTableCacheSpec();
+ CacheFactory tableCacheFactory = new CacheFactory(
+ CacheSpec.toExpireAfterAccess(cacheSpec.getTtlSecond()),
+ OptionalLong.empty(),
+ cacheSpec.getCapacity(),
true,
null);
- this.snapshotCache = snapshotCacheFactory.buildCache(key ->
loadSnapshot(key), executor);
+ this.tableCache = tableCacheFactory.buildCache(key ->
loadTableCacheValue(key), executor);
+ }
+
+ private CacheSpec resolveTableCacheSpec() {
+ return CacheSpec.fromProperties(catalog.getProperties(),
+ PaimonExternalCatalog.PAIMON_TABLE_CACHE_ENABLE, true,
+ PaimonExternalCatalog.PAIMON_TABLE_CACHE_TTL_SECOND,
+ Config.external_cache_expire_time_seconds_after_access,
+ PaimonExternalCatalog.PAIMON_TABLE_CACHE_CAPACITY,
+ Config.max_external_table_cache_num);
}
@NotNull
- private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) {
+ private PaimonTableCacheValue loadTableCacheValue(PaimonTableCacheKey key)
{
NameMapping nameMapping = key.getNameMapping();
try {
- PaimonSnapshot latestSnapshot = loadLatestSnapshot(key);
+ PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
+ .getCatalogOrException(nameMapping.getCtlId(),
+ id -> new IOException("Catalog not found: " + id));
+ Table table = externalCatalog.getPaimonTable(nameMapping);
+ return new PaimonTableCacheValue(table);
+ } catch (Exception e) {
+ throw new CacheException("failed to load paimon table %s.%s.%s:
%s",
+ e, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
+ nameMapping.getLocalTblName(), e.getMessage());
+ }
+ }
+
+ @NotNull
+ private PaimonSnapshotCacheValue loadSnapshot(ExternalTable dorisTable,
Table paimonTable) {
+ NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
+ try {
+ PaimonSnapshot latestSnapshot = loadLatestSnapshot(paimonTable,
nameMapping);
List<Column> partitionColumns =
getPaimonSchemaCacheValue(nameMapping,
latestSnapshot.getSchemaId()).getPartitionColumns();
- PaimonPartitionInfo partitionInfo = loadPartitionInfo(key,
partitionColumns);
+ PaimonPartitionInfo partitionInfo = loadPartitionInfo(nameMapping,
partitionColumns);
return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot);
} catch (Exception e) {
- throw new CacheException("failed to load paimon snapshot %s.%s.%s
or reason: %s",
- e, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
nameMapping.getLocalTblName(),
- e.getMessage());
+ throw new CacheException("failed to load paimon snapshot %s.%s.%s:
%s",
+ e, nameMapping.getCtlId(), nameMapping.getLocalDbName(),
+ nameMapping.getLocalTblName(), e.getMessage());
}
}
@@ -97,67 +132,96 @@ public class PaimonMetadataCache {
return (PaimonSchemaCacheValue) schemaCacheValue.get();
}
- private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key,
List<Column> partitionColumns)
+ private PaimonPartitionInfo loadPartitionInfo(NameMapping nameMapping,
List<Column> partitionColumns)
throws AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
return PaimonPartitionInfo.EMPTY;
}
- NameMapping nameMapping = key.getNameMapping();
PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(nameMapping.getCtlId());
List<Partition> paimonPartitions =
externalCatalog.getPaimonPartitions(nameMapping);
return PaimonUtil.generatePartitionInfo(partitionColumns,
paimonPartitions);
}
- private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key)
throws IOException {
- NameMapping nameMapping = key.getNameMapping();
- PaimonExternalCatalog externalCatalog = (PaimonExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
- .getCatalogOrException(nameMapping.getCtlId(), id -> new
IOException("Catalog not found: " + id));
- Table table = externalCatalog.getPaimonTable(nameMapping);
- Table snapshotTable = table;
+ private PaimonSnapshot loadLatestSnapshot(Table paimonTable, NameMapping
nameMapping) {
+ Table snapshotTable = paimonTable;
// snapshotId and schemaId
Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
- Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
+ Optional<Snapshot> optionalSnapshot = paimonTable.latestSnapshot();
if (optionalSnapshot.isPresent()) {
latestSnapshotId = optionalSnapshot.get().id();
- snapshotTable =
-
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
latestSnapshotId.toString()));
+ snapshotTable = paimonTable.copy(
+
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
latestSnapshotId.toString()));
}
- DataTable dataTable = (DataTable) table;
+ DataTable dataTable = (DataTable) paimonTable;
long latestSchemaId =
dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshot(latestSnapshotId, latestSchemaId,
snapshotTable);
}
+ public Table getPaimonTable(ExternalTable dorisTable) {
+ PaimonTableCacheKey key = new
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+ return tableCache.get(key).getPaimonTable();
+ }
+
+ public Table getPaimonTable(PaimonTableCacheKey key) {
+ return tableCache.get(key).getPaimonTable();
+ }
+
+ public PaimonSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable)
{
+ PaimonTableCacheKey key = new
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+ PaimonTableCacheValue tableCacheValue = tableCache.get(key);
+ return tableCacheValue.getSnapshotCacheValue(() ->
loadSnapshot(dorisTable,
+ tableCacheValue.getPaimonTable()));
+ }
+
public void invalidateCatalogCache(long catalogId) {
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.getNameMapping().getCtlId() == catalogId)
- .forEach(snapshotCache::invalidate);
+ tableCache.invalidateAll();
}
public void invalidateTableCache(ExternalTable dorisTable) {
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.getNameMapping().getCtlId() ==
dorisTable.getCatalog().getId()
- &&
key.getNameMapping().getLocalDbName().equals(dorisTable.getDbName())
- &&
key.getNameMapping().getLocalTblName().equals(dorisTable.getName()))
- .forEach(snapshotCache::invalidate);
+ PaimonTableCacheKey key = new
PaimonTableCacheKey(dorisTable.getOrBuildNameMapping());
+ tableCache.invalidate(key);
}
public void invalidateDbCache(long catalogId, String dbName) {
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.getNameMapping().getCtlId() == catalogId
- &&
key.getNameMapping().getLocalTblName().equals(dbName))
- .forEach(snapshotCache::invalidate);
- }
-
- public PaimonSnapshotCacheValue getPaimonSnapshot(ExternalTable
dorisTable) {
- PaimonSnapshotCacheKey key = new
PaimonSnapshotCacheKey(dorisTable.getOrBuildNameMapping());
- return snapshotCache.get(key);
+ tableCache.asMap().keySet().stream()
+ .filter(key ->
key.getNameMapping().getLocalDbName().equals(dbName))
+ .forEach(tableCache::invalidate);
}
public Map<String, Map<String, String>> getCacheStats() {
Map<String, Map<String, String>> res = Maps.newHashMap();
- res.put("paimon_snapshot_cache",
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
- snapshotCache.estimatedSize()));
+ res.put("paimon_table_cache",
ExternalMetaCacheMgr.getCacheStats(tableCache.stats(),
+ tableCache.estimatedSize()));
return res;
}
+
+ static class PaimonTableCacheKey {
+ private final NameMapping nameMapping;
+
+ public PaimonTableCacheKey(NameMapping nameMapping) {
+ this.nameMapping = nameMapping;
+ }
+
+ public NameMapping getNameMapping() {
+ return nameMapping;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonTableCacheKey that = (PaimonTableCacheKey) o;
+ return nameMapping.equals(that.nameMapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return nameMapping.hashCode();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
deleted file mode 100644
index 4493f918939..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.datasource.ExternalTable;
-
-import java.util.concurrent.ExecutorService;
-
-public class PaimonMetadataCacheMgr {
-
- private PaimonMetadataCache paimonMetadataCache;
-
- public PaimonMetadataCacheMgr(ExecutorService executor) {
- this.paimonMetadataCache = new PaimonMetadataCache(executor);
- }
-
- public PaimonMetadataCache getPaimonMetadataCache() {
- return paimonMetadataCache;
- }
-
- public void removeCache(long catalogId) {
- paimonMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateCatalogCache(long catalogId) {
- paimonMetadataCache.invalidateCatalogCache(catalogId);
- }
-
- public void invalidateTableCache(ExternalTable dorisTable) {
- paimonMetadataCache.invalidateTableCache(dorisTable);
- }
-
- public void invalidateDbCache(long catalogId, String dbName) {
- paimonMetadataCache.invalidateDbCache(catalogId, dbName);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
deleted file mode 100644
index 6154d607f0b..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.datasource.NameMapping;
-
-import java.util.StringJoiner;
-
-public class PaimonSnapshotCacheKey {
- private final NameMapping nameMapping;
-
- public PaimonSnapshotCacheKey(NameMapping nameMapping) {
- this.nameMapping = nameMapping;
- }
-
- public NameMapping getNameMapping() {
- return nameMapping;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o;
- return nameMapping.equals(that.nameMapping);
- }
-
- @Override
- public int hashCode() {
- return nameMapping.hashCode();
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ",
PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]")
- .add("catalog=" + nameMapping.getCtlId())
- .add("dbName='" + nameMapping.getLocalDbName() + "'")
- .add("tableName='" + nameMapping.getLocalTblName() + "'")
- .toString();
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
new file mode 100644
index 00000000000..cbbd9076b65
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonTableCacheValue.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.paimon.table.Table;
+
+import java.util.function.Supplier;
+
+/**
+ * Cache value for Paimon table metadata.
+ * Encapsulates the Paimon Table object and provides lazy loading for snapshot
cache.
+ */
+public class PaimonTableCacheValue {
+ private final Table paimonTable;
+
+ // Lazy-loaded snapshot cache
+ private volatile boolean snapshotCacheLoaded;
+ private volatile PaimonSnapshotCacheValue snapshotCacheValue;
+
+ public PaimonTableCacheValue(Table paimonTable) {
+ this.paimonTable = paimonTable;
+ }
+
+ public Table getPaimonTable() {
+ return paimonTable;
+ }
+
+ /**
+ * Get snapshot cache value with lazy loading.
+ * Uses double-checked locking to ensure thread-safe initialization.
+ *
+ * @param loader Supplier to load snapshot cache value when needed
+ * @return The cached or newly loaded snapshot cache value
+ */
+ public PaimonSnapshotCacheValue
getSnapshotCacheValue(Supplier<PaimonSnapshotCacheValue> loader) {
+ if (!snapshotCacheLoaded) {
+ synchronized (this) {
+ if (!snapshotCacheLoaded) {
+ snapshotCacheValue = loader.get();
+ snapshotCacheLoaded = true;
+ }
+ }
+ }
+ return snapshotCacheValue;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java
new file mode 100644
index 00000000000..30ec4a1185e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtils.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+
+import org.apache.paimon.table.Table;
+
+import java.util.Optional;
+
+public class PaimonUtils {
+
+ public static Table getPaimonTable(ExternalTable dorisTable) {
+ return
paimonMetadataCache(dorisTable.getCatalog()).getPaimonTable(dorisTable);
+ }
+
+ public static PaimonSnapshotCacheValue
getLatestSnapshotCacheValue(ExternalTable dorisTable) {
+ return
paimonMetadataCache(dorisTable.getCatalog()).getSnapshotCache(dorisTable);
+ }
+
+ public static PaimonSnapshotCacheValue
getSnapshotCacheValue(Optional<MvccSnapshot> snapshot,
+ ExternalTable dorisTable) {
+ if (snapshot.isPresent() && snapshot.get() instanceof
PaimonMvccSnapshot) {
+ return ((PaimonMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
+ }
+ return getLatestSnapshotCacheValue(dorisTable);
+ }
+
+ public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable
dorisTable,
+ PaimonSnapshotCacheValue snapshotValue) {
+ return getSchemaCacheValue(dorisTable,
snapshotValue.getSnapshot().getSchemaId());
+ }
+
+ public static PaimonSchemaCacheValue getSchemaCacheValue(ExternalTable
dorisTable, long schemaId) {
+ return paimonMetadataCache(dorisTable.getCatalog())
+ .getPaimonSchemaCacheValue(dorisTable.getOrBuildNameMapping(),
schemaId);
+ }
+
+ public static PaimonMetadataCache getPaimonMetadataCache(ExternalCatalog
catalog) {
+ return paimonMetadataCache(catalog);
+ }
+
+ private static PaimonMetadataCache paimonMetadataCache(ExternalCatalog
catalog) {
+ return
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache(catalog);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5a7ecec791f..1783477b7ad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource.paimon.source;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@@ -33,6 +32,7 @@ import
org.apache.doris.datasource.credentials.VendedCredentialsFactory;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.PaimonUtil;
+import org.apache.doris.datasource.paimon.PaimonUtils;
import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
@@ -206,8 +206,7 @@ public class PaimonScanNode extends FileQueryScanNode {
private void putHistorySchemaInfo(Long schemaId) {
if (currentQuerySchema.putIfAbsent(schemaId, Boolean.TRUE) == null) {
PaimonExternalTable table = (PaimonExternalTable)
source.getTargetTable();
- TableSchema tableSchema =
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
- .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(),
schemaId).getTableSchema();
+ TableSchema tableSchema = PaimonUtils.getSchemaCacheValue(table,
schemaId).getTableSchema();
params.addToHistorySchemaInfo(
PaimonUtil.getSchemaInfo(tableSchema,
source.getCatalog().getEnableMappingVarbinary(),
source.getCatalog().getEnableMappingTimestampTz()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index cb2a3e15816..d9777d10b39 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -18,6 +18,8 @@
package org.apache.doris.datasource.property.metastore;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.StorageProperties;
@@ -136,6 +138,8 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
* @param catalogProps the catalog properties map to add manifest cache
properties to
*/
protected void addManifestCacheProperties(Map<String, String>
catalogProps) {
+ boolean hasIoManifestCacheEnabled =
StringUtils.isNotBlank(ioManifestCacheEnabled)
+ ||
StringUtils.isNotBlank(catalogProps.get(CatalogProperties.IO_MANIFEST_CACHE_ENABLED));
if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
ioManifestCacheEnabled);
}
@@ -149,6 +153,22 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
ioManifestCacheMaxContentLength);
}
+
+ // default enable io manifest cache if the meta.cache.manifest is
enabled
+ if (!hasIoManifestCacheEnabled) {
+ CacheSpec manifestCacheSpec =
CacheSpec.fromProperties(catalogProps,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
+ IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
+ if (CacheSpec.isCacheEnabled(manifestCacheSpec.isEnable(),
+ manifestCacheSpec.getTtlSecond(),
+ manifestCacheSpec.getCapacity())) {
+ catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
"true");
+ }
+ }
}
/**
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index a8eaf8bc2f4..2af0aef9710 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1519,16 +1519,47 @@ class Suite implements GroovyInterceptable {
// Split by semicolon and execute each statement
def statements = sqlStatements.split(';').collect { it.trim()
}.findAll { it }
def results = []
-
+
for (stmt in statements) {
if (stmt) {
results << spark_iceberg(stmt, timeoutSeconds)
}
}
-
+
return results
}
+ /**
+ * Execute Spark SQL on the spark-iceberg container with Paimon extensions
enabled.
+ *
+ * Usage in test suite:
+ * spark_paimon "CREATE TABLE paimon.test_db.t1 (id INT) USING paimon"
+ * spark_paimon "INSERT INTO paimon.test_db.t1 VALUES (1)"
+ * def result = spark_paimon "SELECT * FROM paimon.test_db.t1"
+ */
+ String spark_paimon(String sqlStr, int timeoutSeconds = 120) {
+ String containerName = getSparkIcebergContainerName()
+ if (containerName == null) {
+ throw new RuntimeException("spark-iceberg container not found.
Please ensure the container is running.")
+ }
+ String masterUrl = "spark://${containerName}:7077"
+
+ String escapedSql = sqlStr.replaceAll('"', '\\\\"')
+ String command = """docker exec ${containerName} spark-sql --master
${masterUrl} --conf
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
-e "${escapedSql}" """
+
+ logger.info("Executing Spark Paimon SQL: ${sqlStr}".toString())
+ logger.info("Container: ${containerName}".toString())
+
+ try {
+ String result = cmd(command, timeoutSeconds)
+ logger.info("Spark Paimon SQL result: ${result}".toString())
+ return result
+ } catch (Exception e) {
+ logger.error("Spark Paimon SQL failed: ${e.message}".toString())
+ throw e
+ }
+ }
+
List<List<Object>> db2_docker(String sqlStr, boolean isOrder = false) {
String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "")
def (result, meta) =
JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr)
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
index d95f05ae76c..fc7a21785f7 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
@@ -39,7 +39,7 @@ suite("test_iceberg_manifest_cache",
"p0,external,doris,external_docker,external
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
"s3.region" = "us-east-1",
- "iceberg.manifest.cache.enable" = "true"
+ "meta.cache.iceberg.manifest.enable" = "true"
);
"""
@@ -54,7 +54,7 @@ suite("test_iceberg_manifest_cache",
"p0,external,doris,external_docker,external
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
"s3.region" = "us-east-1",
- "iceberg.manifest.cache.enable" = "false"
+ "meta.cache.iceberg.manifest.enable" = "false"
);
"""
@@ -116,4 +116,3 @@ suite("test_iceberg_manifest_cache",
"p0,external,doris,external_docker,external
}
}
}
-
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
index 7cc9f6af0b7..cf9ad8145d9 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
@@ -58,7 +58,7 @@ suite("test_iceberg_table_cache",
"p0,external,doris,external_docker,external_do
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
"s3.region" = "us-east-1",
- "iceberg.table.meta.cache.ttl-second" = "0"
+ "meta.cache.iceberg.table.ttl-second" = "0"
);
"""
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
index 0f3391805b0..2e2a2ea8e9b 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_meta_cache.groovy
@@ -37,7 +37,7 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
'fs.defaultFS' = '${default_fs}',
'warehouse' = '${warehouse}',
- 'iceberg.manifest.cache.enable' = 'true'
+ 'meta.cache.iceberg.manifest.enable' = 'true'
);
"""
sql """switch ${catalog_name}"""
@@ -65,7 +65,7 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
sql """select * from test_iceberg_meta_cache_db.sales"""
sql """drop table test_iceberg_meta_cache_db.sales"""
- // 2. test catalog with iceberg.table.meta.cache.ttl-second
+ // 2. test catalog with meta.cache.iceberg.table.ttl-second
sql """drop catalog if exists ${catalog_name_no_cache};"""
test {
sql """
@@ -75,8 +75,8 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hmsPort}',
'fs.defaultFS' = '${default_fs}',
'warehouse' = '${warehouse}',
- 'iceberg.manifest.cache.enable' = 'false',
- 'iceberg.table.meta.cache.ttl-second' = '-2'
+ 'meta.cache.iceberg.manifest.enable' = 'false',
+ 'meta.cache.iceberg.table.ttl-second' = '-2'
);
"""
exception "is wrong"
@@ -90,8 +90,8 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
'fs.defaultFS' = '${default_fs}',
'warehouse' = '${warehouse}',
- 'iceberg.manifest.cache.enable' = 'false',
- 'iceberg.table.meta.cache.ttl-second' = '0'
+ 'meta.cache.iceberg.manifest.enable' = 'false',
+ 'meta.cache.iceberg.table.ttl-second' = '0'
);
"""
sql """switch ${catalog_name_no_cache}"""
@@ -126,7 +126,7 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
'fs.defaultFS' = '${default_fs}',
'warehouse' = '${warehouse}',
- 'iceberg.manifest.cache.enable' = 'false'
+ 'meta.cache.iceberg.manifest.enable' = 'false'
);
"""
sql """switch ${catalog_name_no_cache}"""
@@ -147,11 +147,11 @@ suite("test_iceberg_table_meta_cache",
"p0,external,doris,external_docker,extern
sql """select * from test_iceberg_meta_cache_db.sales"""
// alter wrong catalog property
test {
- sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "-2")"""
+ sql """alter catalog ${catalog_name_no_cache} set properties
("meta.cache.iceberg.table.ttl-second" = "-2")"""
exception "is wrong"
}
// alter catalog property, disable meta cache
- sql """alter catalog ${catalog_name_no_cache} set properties
("iceberg.table.meta.cache.ttl-second" = "0")"""
+ sql """alter catalog ${catalog_name_no_cache} set properties
("meta.cache.iceberg.table.ttl-second" = "0")"""
// select 2 rows
sql """select * from test_iceberg_meta_cache_db.sales"""
// insert into new value
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
new file mode 100644
index 00000000000..2a3176688f5
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_table_meta_cache.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_table_meta_cache",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable paimon test.")
+ return
+ }
+
+ String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ String catalogWithCache = "test_paimon_table_cache_with_cache"
+ String catalogNoCache = "test_paimon_table_cache_no_cache"
+ String testDb = "paimon_cache_test_db"
+
+ sql """drop catalog if exists ${catalogWithCache}"""
+ sql """drop catalog if exists ${catalogNoCache}"""
+
+ sql """
+ CREATE CATALOG ${catalogWithCache} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+
+ sql """
+ CREATE CATALOG ${catalogNoCache} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minioPort}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true',
+ 'meta.cache.paimon.table.ttl-second' = '0'
+ );
+ """
+
+ try {
+ spark_paimon "CREATE DATABASE IF NOT EXISTS paimon.${testDb}"
+
+ // ==================== Test 1: DML (INSERT) ====================
+ logger.info("========== Test 1: DML (INSERT) ==========")
+ spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert"
+ spark_paimon "CREATE TABLE paimon.${testDb}.test_insert (id INT, name
STRING) USING paimon"
+ spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (1,
'initial')"
+
+ sql """switch ${catalogWithCache}"""
+ def result1 = sql """select * from ${testDb}.test_insert order by id"""
+ assertEquals(1, result1.size())
+
+ sql """switch ${catalogNoCache}"""
+ def result1NoCache = sql """select * from ${testDb}.test_insert order
by id"""
+ assertEquals(1, result1NoCache.size())
+
+ spark_paimon "INSERT INTO paimon.${testDb}.test_insert VALUES (2,
'external_insert')"
+
+ sql """switch ${catalogWithCache}"""
+ def result2 = sql """select * from ${testDb}.test_insert order by id"""
+ assertEquals(1, result2.size())
+
+ sql """switch ${catalogNoCache}"""
+ def result2NoCache = sql """select * from ${testDb}.test_insert order
by id"""
+ assertEquals(2, result2NoCache.size())
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_insert"""
+ def result3 = sql """select * from ${testDb}.test_insert order by id"""
+ assertEquals(2, result3.size())
+
+ // ==================== Test 2: Schema Change (ADD COLUMN)
====================
+ logger.info("========== Test 2: Schema Change (ADD COLUMN) ==========")
+ spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_add_column"
+ spark_paimon "CREATE TABLE paimon.${testDb}.test_add_column (id INT,
name STRING) USING paimon"
+ spark_paimon "INSERT INTO paimon.${testDb}.test_add_column VALUES (1,
'test')"
+
+ sql """switch ${catalogWithCache}"""
+ def addColDesc1 = sql """desc ${testDb}.test_add_column"""
+ assertEquals(2, addColDesc1.size())
+
+ sql """switch ${catalogNoCache}"""
+ def addColDesc1NoCache = sql """desc ${testDb}.test_add_column"""
+ assertEquals(2, addColDesc1NoCache.size())
+
+ spark_paimon "ALTER TABLE paimon.${testDb}.test_add_column ADD COLUMNS
(new_col INT)"
+
+ sql """switch ${catalogWithCache}"""
+ def addColDesc2 = sql """desc ${testDb}.test_add_column"""
+ assertEquals(2, addColDesc2.size())
+
+ sql """switch ${catalogNoCache}"""
+ def addColDesc2NoCache = sql """desc ${testDb}.test_add_column"""
+ assertEquals(3, addColDesc2NoCache.size())
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_add_column"""
+ def addColDesc3 = sql """desc ${testDb}.test_add_column"""
+ assertEquals(3, addColDesc3.size())
+ } finally {
+ try {
+ spark_paimon "DROP TABLE IF EXISTS paimon.${testDb}.test_insert"
+ spark_paimon "DROP TABLE IF EXISTS
paimon.${testDb}.test_add_column"
+ } catch (Exception e) {
+ logger.warn("Failed to drop paimon tables:
${e.message}".toString())
+ }
+ sql """drop catalog if exists ${catalogWithCache}"""
+ sql """drop catalog if exists ${catalogNoCache}"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]