This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5a8bde0e9 [core] Allow cache and refresh partitions for CachingCatalog
(#4427)
5a8bde0e9 is described below
commit 5a8bde0e9c0f2f41864fa8687b55e2d72d57b0ee
Author: Jiao Mingye <[email protected]>
AuthorDate: Tue Nov 5 13:31:27 2024 +0800
[core] Allow cache and refresh partitions for CachingCatalog (#4427)
---
.../generated/catalog_configuration.html | 6 ++
.../org/apache/paimon/options/CatalogOptions.java | 7 +++
.../org/apache/paimon/catalog/CachingCatalog.java | 72 +++++++++++++++++++++-
.../apache/paimon/catalog/CachingCatalogTest.java | 62 ++++++++++++++-----
.../paimon/catalog/TestableCachingCatalog.java | 14 +++--
5 files changed, 138 insertions(+), 23 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index b9d74c812..94f11c6c4 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,6 +44,12 @@ under the License.
<td>Duration</td>
<td>Controls the duration for which databases and tables in the
catalog are cached.</td>
</tr>
+ <tr>
+ <td><h5>cache.partition.max-num</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Long</td>
+ <td>Controls the max number for which partitions in the catalog
are cached.</td>
+ </tr>
<tr>
<td><h5>cache.manifest.max-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 081668675..0d8a9290a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -98,6 +98,13 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and
tables in the catalog are cached.");
+ public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
+ key("cache.partition.max-num")
+ .longType()
+ .defaultValue(0L)
+ .withDescription(
+ "Controls the max number for which partitions in
the catalog are cached.");
+
public static final ConfigOption<MemorySize>
CACHE_MANIFEST_SMALL_FILE_MEMORY =
key("cache.manifest.small-file-memory")
.memoryType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 077775945..444a828af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
@@ -52,6 +53,7 @@ import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
/** A {@link Catalog} to cache databases and tables and manifests. */
@@ -61,26 +63,31 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
+ @Nullable protected final Cache<Identifier, List<PartitionEntry>>
partitionCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
+ private final long cachedPartitionMaxNum;
public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
- CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
+ CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
+ CACHE_PARTITION_MAX_NUM.defaultValue());
}
public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
- long manifestCacheThreshold) {
+ long manifestCacheThreshold,
+ long cachedPartitionMaxNum) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
+ cachedPartitionMaxNum,
Ticker.systemTicker());
}
@@ -89,6 +96,7 @@ public class CachingCatalog extends DelegateCatalog {
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
+ long cachedPartitionMaxNum,
Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
@@ -111,7 +119,19 @@ public class CachingCatalog extends DelegateCatalog {
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
+ this.partitionCache =
+ cachedPartitionMaxNum == 0
+ ? null
+ : Caffeine.newBuilder()
+ .softValues()
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .weigher(this::weigh)
+ .maximumWeight(cachedPartitionMaxNum)
+ .ticker(ticker)
+ .build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
+ this.cachedPartitionMaxNum = cachedPartitionMaxNum;
}
public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -131,7 +151,8 @@ public class CachingCatalog extends DelegateCatalog {
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
- manifestThreshold);
+ manifestThreshold,
+ options.get(CACHE_PARTITION_MAX_NUM));
}
@Override
@@ -227,6 +248,51 @@ public class CachingCatalog extends DelegateCatalog {
tableCache.put(identifier, table);
}
+ public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (partitionCacheEnabled(table)) {
+ List<PartitionEntry> partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List<PartitionEntry> partitions =
+ ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
+ if (partitionCacheEnabled(table)
+ &&
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+ < this.cachedPartitionMaxNum) {
+ partitionCache.put(identifier, partitions);
+ }
+ return partitions;
+ }
+
+ private boolean partitionCacheEnabled(Table table) {
+ return partitionCache != null
+ && table instanceof FileStoreTable
+ && !table.partitionKeys().isEmpty();
+ }
+
+ private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
+ return partitions.size();
+ }
+
+ @Override
+ public void dropPartition(Identifier identifier, Map<String, String>
partitions)
+ throws TableNotExistException, PartitionNotExistException {
+ wrapped.dropPartition(identifier, partitions);
+ if (partitionCache != null) {
+ partitionCache.invalidate(identifier);
+ }
+ }
+
private class TableInvalidatingRemovalListener implements
RemovalListener<Identifier, Table> {
@Override
public void onRemoval(Identifier identifier, Table table, @NonNull
RemovalCause cause) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index d1f7eeb8a..d645c46bf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -20,8 +20,10 @@ package org.apache.paimon.catalog;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -32,6 +34,8 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.FakeTicker;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -44,6 +48,7 @@ import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -51,6 +56,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.paimon.data.BinaryString.fromString;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
@@ -113,15 +120,15 @@ class CachingCatalogTest extends CatalogTestBase {
Table table = catalog.getTable(tableIdent);
// Ensure table is cached with full ttl remaining upon creation
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.getTable(tableIdent))
.as("CachingCatalog should return a new instance after
expiration")
.isNotSameAs(table);
@@ -135,11 +142,11 @@ class CachingCatalogTest extends CatalogTestBase {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
assertThat(catalog.remainingAgeFor(tableIdent))
.isPresent()
@@ -148,7 +155,7 @@ class CachingCatalogTest extends CatalogTestBase {
Duration oneMinute = Duration.ofMinutes(1L);
ticker.advance(oneMinute);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent))
.isPresent()
.get()
@@ -175,17 +182,17 @@ class CachingCatalogTest extends CatalogTestBase {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
Table table = catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
-
assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() &&
age.get().equals(Duration.ZERO));
@@ -209,17 +216,39 @@ class CachingCatalogTest extends CatalogTestBase {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
- assertThat(catalog.cache().asMap())
+ assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires,
its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}
+ @Test
+ public void testPartitionCache() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Schema schema =
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdent, schema, false);
+ List<PartitionEntry> partitionEntryList =
catalog.getPartitions(tableIdent);
+ assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
+ List<PartitionEntry> partitionEntryListFromCache =
+ catalog.partitionCache().getIfPresent(tableIdent);
+ assertThat(partitionEntryListFromCache).isNotNull();
+
assertThat(partitionEntryListFromCache).containsAll(partitionEntryList);
+ }
+
@Test
public void testDeadlock() throws Exception {
Catalog underlyCatalog = this.catalog;
@@ -233,7 +262,7 @@ class CachingCatalogTest extends CatalogTestBase {
createdTables.add(tableIdent);
}
- Cache<Identifier, Table> cache = catalog.cache();
+ Cache<Identifier, Table> cache = catalog.tableCache();
AtomicInteger cacheGetCount = new AtomicInteger(0);
AtomicInteger cacheCleanupCount = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -288,10 +317,10 @@ class CachingCatalogTest extends CatalogTestBase {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
catalog.dropTable(tableIdent, false);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
-
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
+
assertThat(wrappedCatalog.tableCache().asMap()).doesNotContainKey(tableIdent);
}
public static Identifier[] sysTables(Identifier tableIdent) {
@@ -313,7 +342,8 @@ class CachingCatalogTest extends CatalogTestBase {
this.catalog,
Duration.ofSeconds(10),
MemorySize.ofMebiBytes(1),
- manifestCacheThreshold);
+ manifestCacheThreshold,
+ 0L);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.dropTable(tableIdent, true);
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 159f5edae..4c70a0232 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.Table;
@@ -25,6 +26,7 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
import java.time.Duration;
+import java.util.List;
import java.util.Optional;
/**
@@ -36,18 +38,22 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;
public TestableCachingCatalog(Catalog catalog, Duration
expirationInterval, Ticker ticker) {
- super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE,
ticker);
+ super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE,
Long.MAX_VALUE, ticker);
this.cacheExpirationInterval = expirationInterval;
}
- public Cache<Identifier, Table> cache() {
+ public Cache<Identifier, Table> tableCache() {
// cleanUp must be called as tests apply assertions directly on the
underlying map, but
- // metadata
- // table map entries are cleaned up asynchronously.
+ // metadata table map entries are cleaned up asynchronously.
tableCache.cleanUp();
return tableCache;
}
+ public Cache<Identifier, List<PartitionEntry>> partitionCache() {
+ partitionCache.cleanUp();
+ return partitionCache;
+ }
+
public Optional<Duration> ageOf(Identifier identifier) {
return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
}