This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2f4c8d0d6 [core] Introduce listPartitions interface to Catalog
2f4c8d0d6 is described below
commit 2f4c8d0d67f0513acc8868cf44b3672b13839c07
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 5 13:48:46 2024 +0800
[core] Introduce listPartitions interface to Catalog
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 7 +++
.../org/apache/paimon/catalog/CachingCatalog.java | 52 +++++++---------------
.../java/org/apache/paimon/catalog/Catalog.java | 9 ++++
.../org/apache/paimon/catalog/DelegateCatalog.java | 7 +++
.../apache/paimon/catalog/CachingCatalogTest.java | 2 +-
5 files changed, 41 insertions(+), 36 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 1b390bc66..b3f255f10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
@@ -198,6 +199,12 @@ public abstract class AbstractCatalog implements Catalog {
}
}
+ @Override
+ public List<PartitionEntry> listPartitions(Identifier identifier)
+ throws TableNotExistException {
+ return
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+ }
+
protected abstract void createDatabaseImpl(String name, Map<String,
String> properties);
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 444a828af..b1f683e40 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -34,6 +34,7 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caff
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
@@ -63,9 +64,10 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
- @Nullable protected final Cache<Identifier, List<PartitionEntry>>
partitionCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
- private final long cachedPartitionMaxNum;
+
+ // partition cache will affect data latency
+ @Nullable protected final Cache<Identifier, List<PartitionEntry>>
partitionCache;
public CachingCatalog(Catalog wrapped) {
this(
@@ -126,12 +128,13 @@ public class CachingCatalog extends DelegateCatalog {
.softValues()
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
- .weigher(this::weigh)
+ .weigher(
+ (Weigher<Identifier,
List<PartitionEntry>>)
+ (identifier, v) -> v.size())
.maximumWeight(cachedPartitionMaxNum)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
- this.cachedPartitionMaxNum = cachedPartitionMaxNum;
}
public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -248,40 +251,19 @@ public class CachingCatalog extends DelegateCatalog {
tableCache.put(identifier, table);
}
- public List<PartitionEntry> getPartitions(Identifier identifier) throws
TableNotExistException {
- Table table = this.getTable(identifier);
- if (partitionCacheEnabled(table)) {
- List<PartitionEntry> partitions;
- partitions = partitionCache.getIfPresent(identifier);
- if (partitions == null || partitions.isEmpty()) {
- partitions = this.refreshPartitions(identifier);
- }
- return partitions;
- }
- return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
- }
-
- public List<PartitionEntry> refreshPartitions(Identifier identifier)
+ @Override
+ public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
- Table table = this.getTable(identifier);
- List<PartitionEntry> partitions =
- ((FileStoreTable)
table).newSnapshotReader().partitionEntries();
- if (partitionCacheEnabled(table)
- &&
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
- < this.cachedPartitionMaxNum) {
- partitionCache.put(identifier, partitions);
+ if (partitionCache == null) {
+ return wrapped.listPartitions(identifier);
}
- return partitions;
- }
- private boolean partitionCacheEnabled(Table table) {
- return partitionCache != null
- && table instanceof FileStoreTable
- && !table.partitionKeys().isEmpty();
- }
-
- private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
- return partitions.size();
+ List<PartitionEntry> result = partitionCache.getIfPresent(identifier);
+ if (result == null) {
+ result = wrapped.listPartitions(identifier);
+ partitionCache.put(identifier, result);
+ }
+ return result;
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 84e924a2a..c72c354e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -290,6 +291,14 @@ public interface Catalog extends AutoCloseable {
void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException;
+ /**
+ * Get PartitionEntry of all partitions of the table.
+ *
+ * @param identifier path of the table to list partitions
+ * @throws TableNotExistException if the table does not exist
+ */
+ List<PartitionEntry> listPartitions(Identifier identifier) throws
TableNotExistException;
+
/**
* Modify an existing table from a {@link SchemaChange}.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 0f9cf2b64..01719e590 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -192,6 +193,12 @@ public class DelegateCatalog implements Catalog {
wrapped.dropPartition(identifier, partitions);
}
+ @Override
+ public List<PartitionEntry> listPartitions(Identifier identifier)
+ throws TableNotExistException {
+ return wrapped.listPartitions(identifier);
+ }
+
@Override
public void repairCatalog() {
wrapped.repairCatalog();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index d645c46bf..0cdd9486c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -241,7 +241,7 @@ class CachingCatalogTest extends CatalogTestBase {
Collections.emptyMap(),
"");
catalog.createTable(tableIdent, schema, false);
- List<PartitionEntry> partitionEntryList =
catalog.getPartitions(tableIdent);
+ List<PartitionEntry> partitionEntryList =
catalog.listPartitions(tableIdent);
assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
List<PartitionEntry> partitionEntryListFromCache =
catalog.partitionCache().getIfPresent(tableIdent);