This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 064db80c47 [core] Introduce 'cache.expire-after-write' to expire a max
time interval (#5574)
064db80c47 is described below
commit 064db80c47bb2d4b3959bb61b8a1d07e5896bfec
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 8 09:57:44 2025 +0800
[core] Introduce 'cache.expire-after-write' to expire a max time interval
(#5574)
---
.../generated/catalog_configuration.html | 10 +++++--
.../org/apache/paimon/options/CatalogOptions.java | 14 +++++++--
.../org/apache/paimon/catalog/CachingCatalog.java | 33 ++++++++++++++-------
.../apache/paimon/catalog/CachingCatalogTest.java | 34 ++++++++++++++++++++--
.../paimon/catalog/TestableCachingCatalog.java | 17 +++++++----
5 files changed, 85 insertions(+), 23 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index e1bd22a388..7a4041bba8 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -33,10 +33,16 @@ under the License.
<td>Controls whether the catalog will cache databases, tables,
manifests and partitions.</td>
</tr>
<tr>
- <td><h5>cache.expiration-interval</h5></td>
+ <td><h5>cache.expire-after-access</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
- <td>Controls the duration for which databases and tables in the
catalog are cached.</td>
+ <td>Cache expiration policy: marks cache entries to expire after a
specified duration has passed since their last access.</td>
+ </tr>
+ <tr>
+ <td><h5>cache.expire-after-write</h5></td>
+ <td style="word-wrap: break-word;">30 min</td>
+ <td>Duration</td>
+ <td>Cache expiration policy: marks cache entries to expire after a
specified duration has passed since their last refresh.</td>
</tr>
<tr>
<td><h5>cache.manifest.max-memory</h5></td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 4d62c629de..080c8cd2d8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -89,12 +89,20 @@ public class CatalogOptions {
.withDescription(
"Controls whether the catalog will cache
databases, tables, manifests and partitions.");
- public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
- key("cache.expiration-interval")
+ public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_ACCESS =
+ key("cache.expire-after-access")
.durationType()
.defaultValue(Duration.ofMinutes(10))
+ .withFallbackKeys("cache.expiration-interval")
.withDescription(
- "Controls the duration for which databases and
tables in the catalog are cached.");
+ "Cache expiration policy: marks cache entries to
expire after a specified duration has passed since their last access.");
+
+ public static final ConfigOption<Duration> CACHE_EXPIRE_AFTER_WRITE =
+ key("cache.expire-after-write")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(30))
+ .withDescription(
+ "Cache expiration policy: marks cache entries to
expire after a specified duration has passed since their last refresh.");
public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
key("cache.partition.max-num")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index e033c07f55..b3c4c5928d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -44,7 +44,8 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
-import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -57,7 +58,8 @@ public class CachingCatalog extends DelegateCatalog {
private final Options options;
- private final Duration expirationInterval;
+ private final Duration expireAfterAccess;
+ private final Duration expireAfterWrite;
private final int snapshotMaxNumPerTable;
private final long cachedPartitionMaxNum;
@@ -80,11 +82,17 @@ public class CachingCatalog extends DelegateCatalog {
manifestCacheThreshold = Long.MAX_VALUE;
}
- this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS);
- if (expirationInterval.isZero() || expirationInterval.isNegative()) {
+ this.expireAfterAccess = options.get(CACHE_EXPIRE_AFTER_ACCESS);
+ if (expireAfterAccess.isZero() || expireAfterAccess.isNegative()) {
throw new IllegalArgumentException(
- "When cache.expiration-interval is set to negative or 0,
the catalog cache should be disabled.");
+ "When 'cache.expire-after-access' is set to negative or 0,
the catalog cache should be disabled.");
}
+ this.expireAfterWrite = options.get(CACHE_EXPIRE_AFTER_WRITE);
+ if (expireAfterWrite.isZero() || expireAfterWrite.isNegative()) {
+ throw new IllegalArgumentException(
+ "When 'cache.expire-after-write' is set to negative or 0,
the catalog cache should be disabled.");
+ }
+
this.snapshotMaxNumPerTable =
options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE);
this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
@@ -98,14 +106,16 @@ public class CachingCatalog extends DelegateCatalog {
Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
- .expireAfterAccess(expirationInterval)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
.ticker(ticker)
.build();
this.tableCache =
Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
- .expireAfterAccess(expirationInterval)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
.ticker(ticker)
.build();
this.partitionCache =
@@ -114,7 +124,8 @@ public class CachingCatalog extends DelegateCatalog {
: Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
- .expireAfterAccess(expirationInterval)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
.weigher(
(Weigher<Identifier, List<Partition>>)
(identifier, v) -> v.size())
@@ -239,14 +250,16 @@ public class CachingCatalog extends DelegateCatalog {
storeTable.setSnapshotCache(
Caffeine.newBuilder()
.softValues()
- .expireAfterAccess(expirationInterval)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
.maximumSize(snapshotMaxNumPerTable)
.executor(Runnable::run)
.build());
storeTable.setStatsCache(
Caffeine.newBuilder()
.softValues()
- .expireAfterAccess(expirationInterval)
+ .expireAfterAccess(expireAfterAccess)
+ .expireAfterWrite(expireAfterWrite)
.maximumSize(5)
.executor(Runnable::run)
.build());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index c8a43f9013..153f2af81d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.paimon.data.BinaryString.fromString;
-import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
@@ -199,6 +199,34 @@ class CachingCatalogTest extends CatalogTestBase {
.isNotSameAs(table);
}
+ @Test
+ public void testTableExpiresAfterWrite() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(
+ this.catalog, Duration.ofMinutes(5),
Duration.ofMinutes(8), ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(tableIdent);
+
+ ticker.advance(Duration.ofMinutes(2));
+
+ // refresh from get
+ catalog.getTable(tableIdent);
+
+ // not expire
+ ticker.advance(Duration.ofMinutes(4));
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
+ catalog.getTable(tableIdent);
+
+ // advance 10 minutes to expire from write
+ ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(4)));
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.getTable(tableIdent))
+ .as("CachingCatalog should return a new instance after
expiration")
+ .isNotSameAs(table);
+ }
+
@Test
public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog()
throws Exception {
TestableCachingCatalog catalog =
@@ -320,7 +348,7 @@ class CachingCatalogTest extends CatalogTestBase {
() -> new TestableCachingCatalog(this.catalog,
Duration.ZERO, ticker))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "When cache.expiration-interval is set to negative or
0, the catalog cache should be disabled.");
+ "When 'cache.expire-after-access' is set to negative
or 0, the catalog cache should be disabled.");
}
@Test
@@ -378,7 +406,7 @@ class CachingCatalogTest extends CatalogTestBase {
private void innerTestManifestCache(long manifestCacheThreshold) throws
Exception {
Options options = new Options();
- options.set(CACHE_EXPIRATION_INTERVAL_MS, Duration.ofSeconds(10));
+ options.set(CACHE_EXPIRE_AFTER_ACCESS, Duration.ofSeconds(10));
options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY,
MemorySize.ofMebiBytes(1));
options.set(
CACHE_MANIFEST_SMALL_FILE_THRESHOLD,
MemorySize.ofBytes(manifestCacheThreshold));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 05dbae004a..05277478aa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -29,7 +29,8 @@ import java.time.Duration;
import java.util.List;
import java.util.Optional;
-import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
/**
@@ -41,14 +42,20 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;
public TestableCachingCatalog(Catalog catalog, Duration
expirationInterval, Ticker ticker) {
- super(catalog, createOptions(expirationInterval));
+ this(catalog, expirationInterval, Duration.ofDays(1), ticker);
+ }
+
+ public TestableCachingCatalog(
+ Catalog catalog, Duration expireAfterAccess, Duration
expireAfterWrite, Ticker ticker) {
+ super(catalog, createOptions(expireAfterAccess, expireAfterWrite));
init(ticker);
- this.cacheExpirationInterval = expirationInterval;
+ this.cacheExpirationInterval = expireAfterAccess;
}
- private static Options createOptions(Duration expirationInterval) {
+ private static Options createOptions(Duration expireAfterAccess, Duration
expireAfterWrite) {
Options options = new Options();
- options.set(CACHE_EXPIRATION_INTERVAL_MS, expirationInterval);
+ options.set(CACHE_EXPIRE_AFTER_ACCESS, expireAfterAccess);
+ options.set(CACHE_EXPIRE_AFTER_WRITE, expireAfterWrite);
options.set(CACHE_PARTITION_MAX_NUM, 100L);
return options;
}