This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 064db80c47 [core] Introduce 'cache.expire-after-write' to expire a max 
time interval (#5574)
064db80c47 is described below

commit 064db80c47bb2d4b3959bb61b8a1d07e5896bfec
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 8 09:57:44 2025 +0800

    [core] Introduce 'cache.expire-after-write' to expire a max time interval 
(#5574)
---
 .../generated/catalog_configuration.html           | 10 +++++--
 .../org/apache/paimon/options/CatalogOptions.java  | 14 +++++++--
 .../org/apache/paimon/catalog/CachingCatalog.java  | 33 ++++++++++++++-------
 .../apache/paimon/catalog/CachingCatalogTest.java  | 34 ++++++++++++++++++++--
 .../paimon/catalog/TestableCachingCatalog.java     | 17 +++++++----
 5 files changed, 85 insertions(+), 23 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index e1bd22a388..7a4041bba8 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -33,10 +33,16 @@ under the License.
             <td>Controls whether the catalog will cache databases, tables, 
manifests and partitions.</td>
         </tr>
         <tr>
-            <td><h5>cache.expiration-interval</h5></td>
+            <td><h5>cache.expire-after-access</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
             <td>Duration</td>
-            <td>Controls the duration for which databases and tables in the 
catalog are cached.</td>
+            <td>Cache expiration policy: marks cache entries to expire after a 
specified duration has passed since their last access.</td>
+        </tr>
+        <tr>
+            <td><h5>cache.expire-after-write</h5></td>
+            <td style="word-wrap: break-word;">30 min</td>
+            <td>Duration</td>
+            <td>Cache expiration policy: marks cache entries to expire after a 
specified duration has passed since their last refresh.</td>
         </tr>
         <tr>
             <td><h5>cache.manifest.max-memory</h5></td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 4d62c629de..080c8cd2d8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -89,12 +89,20 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls whether the catalog will cache 
databases, tables, manifests and partitions.");
 
-    public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
-            key("cache.expiration-interval")
+    public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_ACCESS =
+            key("cache.expire-after-access")
                     .durationType()
                     .defaultValue(Duration.ofMinutes(10))
+                    .withFallbackKeys("cache.expiration-interval")
                     .withDescription(
-                            "Controls the duration for which databases and 
tables in the catalog are cached.");
+                            "Cache expiration policy: marks cache entries to 
expire after a specified duration has passed since their last access.");
+
+    public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_WRITE =
+            key("cache.expire-after-write")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(30))
+                    .withDescription(
+                            "Cache expiration policy: marks cache entries to 
expire after a specified duration has passed since their last refresh.");
 
     public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
             key("cache.partition.max-num")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index e033c07f55..b3c4c5928d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -44,7 +44,8 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
-import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -57,7 +58,8 @@ public class CachingCatalog extends DelegateCatalog {
 
     private final Options options;
 
-    private final Duration expirationInterval;
+    private final Duration expireAfterAccess;
+    private final Duration expireAfterWrite;
     private final int snapshotMaxNumPerTable;
     private final long cachedPartitionMaxNum;
 
@@ -80,11 +82,17 @@ public class CachingCatalog extends DelegateCatalog {
             manifestCacheThreshold = Long.MAX_VALUE;
         }
 
-        this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS);
-        if (expirationInterval.isZero() || expirationInterval.isNegative()) {
+        this.expireAfterAccess = options.get(CACHE_EXPIRE_AFTER_ACCESS);
+        if (expireAfterAccess.isZero() || expireAfterAccess.isNegative()) {
             throw new IllegalArgumentException(
-                    "When cache.expiration-interval is set to negative or 0, 
the catalog cache should be disabled.");
+                    "When 'cache.expire-after-access' is set to negative or 0, 
the catalog cache should be disabled.");
         }
+        this.expireAfterWrite = options.get(CACHE_EXPIRE_AFTER_WRITE);
+        if (expireAfterWrite.isZero() || expireAfterWrite.isNegative()) {
+            throw new IllegalArgumentException(
+                    "When 'cache.expire-after-write' is set to negative or 0, 
the catalog cache should be disabled.");
+        }
+
         this.snapshotMaxNumPerTable = 
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
         this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
 
@@ -98,14 +106,16 @@ public class CachingCatalog extends DelegateCatalog {
                 Caffeine.newBuilder()
                         .softValues()
                         .executor(Runnable::run)
-                        .expireAfterAccess(expirationInterval)
+                        .expireAfterAccess(expireAfterAccess)
+                        .expireAfterWrite(expireAfterWrite)
                         .ticker(ticker)
                         .build();
         this.tableCache =
                 Caffeine.newBuilder()
                         .softValues()
                         .executor(Runnable::run)
-                        .expireAfterAccess(expirationInterval)
+                        .expireAfterAccess(expireAfterAccess)
+                        .expireAfterWrite(expireAfterWrite)
                         .ticker(ticker)
                         .build();
         this.partitionCache =
@@ -114,7 +124,8 @@ public class CachingCatalog extends DelegateCatalog {
                         : Caffeine.newBuilder()
                                 .softValues()
                                 .executor(Runnable::run)
-                                .expireAfterAccess(expirationInterval)
+                                .expireAfterAccess(expireAfterAccess)
+                                .expireAfterWrite(expireAfterWrite)
                                 .weigher(
                                         (Weigher<Identifier, List<Partition>>)
                                                 (identifier, v) -> v.size())
@@ -239,14 +250,16 @@ public class CachingCatalog extends DelegateCatalog {
             storeTable.setSnapshotCache(
                     Caffeine.newBuilder()
                             .softValues()
-                            .expireAfterAccess(expirationInterval)
+                            .expireAfterAccess(expireAfterAccess)
+                            .expireAfterWrite(expireAfterWrite)
                             .maximumSize(snapshotMaxNumPerTable)
                             .executor(Runnable::run)
                             .build());
             storeTable.setStatsCache(
                     Caffeine.newBuilder()
                             .softValues()
-                            .expireAfterAccess(expirationInterval)
+                            .expireAfterAccess(expireAfterAccess)
+                            .expireAfterWrite(expireAfterWrite)
                             .maximumSize(5)
                             .executor(Runnable::run)
                             .build());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index c8a43f9013..153f2af81d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.paimon.data.BinaryString.fromString;
-import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -199,6 +199,34 @@ class CachingCatalogTest extends CatalogTestBase {
                 .isNotSameAs(table);
     }
 
+    @Test
+    public void testTableExpiresAfterWrite() throws Exception {
+        TestableCachingCatalog catalog =
+                new TestableCachingCatalog(
+                        this.catalog, Duration.ofMinutes(5), 
Duration.ofMinutes(8), ticker);
+
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        Table table = catalog.getTable(tableIdent);
+
+        ticker.advance(Duration.ofMinutes(2));
+
+        // refresh from get
+        catalog.getTable(tableIdent);
+
+        // not expire
+        ticker.advance(Duration.ofMinutes(4));
+        assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
+        catalog.getTable(tableIdent);
+
+        // advance 10 minutes to expire from write
+        ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(4)));
+        assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
+        assertThat(catalog.getTable(tableIdent))
+                .as("CachingCatalog should return a new instance after 
expiration")
+                .isNotSameAs(table);
+    }
+
     @Test
     public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() 
throws Exception {
         TestableCachingCatalog catalog =
@@ -320,7 +348,7 @@ class CachingCatalogTest extends CatalogTestBase {
                         () -> new TestableCachingCatalog(this.catalog, 
Duration.ZERO, ticker))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
-                        "When cache.expiration-interval is set to negative or 
0, the catalog cache should be disabled.");
+                        "When 'cache.expire-after-access' is set to negative 
or 0, the catalog cache should be disabled.");
     }
 
     @Test
@@ -378,7 +406,7 @@ class CachingCatalogTest extends CatalogTestBase {
 
     private void innerTestManifestCache(long manifestCacheThreshold) throws 
Exception {
         Options options = new Options();
-        options.set(CACHE_EXPIRATION_INTERVAL_MS, Duration.ofSeconds(10));
+        options.set(CACHE_EXPIRE_AFTER_ACCESS, Duration.ofSeconds(10));
         options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY, 
MemorySize.ofMebiBytes(1));
         options.set(
                 CACHE_MANIFEST_SMALL_FILE_THRESHOLD, 
MemorySize.ofBytes(manifestCacheThreshold));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 05dbae004a..05277478aa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -29,7 +29,8 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
 import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
 
 /**
@@ -41,14 +42,20 @@ public class TestableCachingCatalog extends CachingCatalog {
     private final Duration cacheExpirationInterval;
 
     public TestableCachingCatalog(Catalog catalog, Duration 
expirationInterval, Ticker ticker) {
-        super(catalog, createOptions(expirationInterval));
+        this(catalog, expirationInterval, Duration.ofDays(1), ticker);
+    }
+
+    public TestableCachingCatalog(
+            Catalog catalog, Duration expireAfterAccess, Duration 
expireAfterWrite, Ticker ticker) {
+        super(catalog, createOptions(expireAfterAccess, expireAfterWrite));
         init(ticker);
-        this.cacheExpirationInterval = expirationInterval;
+        this.cacheExpirationInterval = expireAfterAccess;
     }
 
-    private static Options createOptions(Duration expirationInterval) {
+    private static Options createOptions(Duration expireAfterAccess, Duration 
expireAfterWrite) {
         Options options = new Options();
-        options.set(CACHE_EXPIRATION_INTERVAL_MS, expirationInterval);
+        options.set(CACHE_EXPIRE_AFTER_ACCESS, expireAfterAccess);
+        options.set(CACHE_EXPIRE_AFTER_WRITE, expireAfterWrite);
         options.set(CACHE_PARTITION_MAX_NUM, 100L);
         return options;
     }

Reply via email to