This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f4c8d0d6 [core] Introduce listPartitions interface to Catalog
2f4c8d0d6 is described below

commit 2f4c8d0d67f0513acc8868cf44b3672b13839c07
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 5 13:48:46 2024 +0800

    [core] Introduce listPartitions interface to Catalog
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  7 +++
 .../org/apache/paimon/catalog/CachingCatalog.java  | 52 +++++++---------------
 .../java/org/apache/paimon/catalog/Catalog.java    |  9 ++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |  7 +++
 .../apache/paimon/catalog/CachingCatalogTest.java  |  2 +-
 5 files changed, 41 insertions(+), 36 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 1b390bc66..b3f255f10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.Lock;
@@ -198,6 +199,12 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
+    @Override
+    public List<PartitionEntry> listPartitions(Identifier identifier)
+            throws TableNotExistException {
+        return 
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+    }
+
     protected abstract void createDatabaseImpl(String name, Map<String, 
String> properties);
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 444a828af..b1f683e40 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -34,6 +34,7 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caff
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
 
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
@@ -63,9 +64,10 @@ public class CachingCatalog extends DelegateCatalog {
 
     protected final Cache<String, Map<String, String>> databaseCache;
     protected final Cache<Identifier, Table> tableCache;
-    @Nullable protected final Cache<Identifier, List<PartitionEntry>> 
partitionCache;
     @Nullable protected final SegmentsCache<Path> manifestCache;
-    private final long cachedPartitionMaxNum;
+
+    // partition cache will affect data latency
+    @Nullable protected final Cache<Identifier, List<PartitionEntry>> 
partitionCache;
 
     public CachingCatalog(Catalog wrapped) {
         this(
@@ -126,12 +128,13 @@ public class CachingCatalog extends DelegateCatalog {
                                 .softValues()
                                 .executor(Runnable::run)
                                 .expireAfterAccess(expirationInterval)
-                                .weigher(this::weigh)
+                                .weigher(
+                                        (Weigher<Identifier, 
List<PartitionEntry>>)
+                                                (identifier, v) -> v.size())
                                 .maximumWeight(cachedPartitionMaxNum)
                                 .ticker(ticker)
                                 .build();
         this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
-        this.cachedPartitionMaxNum = cachedPartitionMaxNum;
     }
 
     public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -248,40 +251,19 @@ public class CachingCatalog extends DelegateCatalog {
         tableCache.put(identifier, table);
     }
 
-    public List<PartitionEntry> getPartitions(Identifier identifier) throws 
TableNotExistException {
-        Table table = this.getTable(identifier);
-        if (partitionCacheEnabled(table)) {
-            List<PartitionEntry> partitions;
-            partitions = partitionCache.getIfPresent(identifier);
-            if (partitions == null || partitions.isEmpty()) {
-                partitions = this.refreshPartitions(identifier);
-            }
-            return partitions;
-        }
-        return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
-    }
-
-    public List<PartitionEntry> refreshPartitions(Identifier identifier)
+    @Override
+    public List<PartitionEntry> listPartitions(Identifier identifier)
             throws TableNotExistException {
-        Table table = this.getTable(identifier);
-        List<PartitionEntry> partitions =
-                ((FileStoreTable) 
table).newSnapshotReader().partitionEntries();
-        if (partitionCacheEnabled(table)
-                && 
partitionCache.asMap().values().stream().mapToInt(List::size).sum()
-                        < this.cachedPartitionMaxNum) {
-            partitionCache.put(identifier, partitions);
+        if (partitionCache == null) {
+            return wrapped.listPartitions(identifier);
         }
-        return partitions;
-    }
 
-    private boolean partitionCacheEnabled(Table table) {
-        return partitionCache != null
-                && table instanceof FileStoreTable
-                && !table.partitionKeys().isEmpty();
-    }
-
-    private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
-        return partitions.size();
+        List<PartitionEntry> result = partitionCache.getIfPresent(identifier);
+        if (result == null) {
+            result = wrapped.listPartitions(identifier);
+            partitionCache.put(identifier, result);
+        }
+        return result;
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 84e924a2a..c72c354e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -290,6 +291,14 @@ public interface Catalog extends AutoCloseable {
     void dropPartition(Identifier identifier, Map<String, String> partitions)
             throws TableNotExistException, PartitionNotExistException;
 
+    /**
+     * Get PartitionEntry of all partitions of the table.
+     *
+     * @param identifier path of the table to list partitions
+     * @throws TableNotExistException if the table does not exist
+     */
+    List<PartitionEntry> listPartitions(Identifier identifier) throws 
TableNotExistException;
+
     /**
      * Modify an existing table from a {@link SchemaChange}.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 0f9cf2b64..01719e590 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -192,6 +193,12 @@ public class DelegateCatalog implements Catalog {
         wrapped.dropPartition(identifier, partitions);
     }
 
+    @Override
+    public List<PartitionEntry> listPartitions(Identifier identifier)
+            throws TableNotExistException {
+        return wrapped.listPartitions(identifier);
+    }
+
     @Override
     public void repairCatalog() {
         wrapped.repairCatalog();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index d645c46bf..0cdd9486c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -241,7 +241,7 @@ class CachingCatalogTest extends CatalogTestBase {
                         Collections.emptyMap(),
                         "");
         catalog.createTable(tableIdent, schema, false);
-        List<PartitionEntry> partitionEntryList = 
catalog.getPartitions(tableIdent);
+        List<PartitionEntry> partitionEntryList = 
catalog.listPartitions(tableIdent);
         assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
         List<PartitionEntry> partitionEntryListFromCache =
                 catalog.partitionCache().getIfPresent(tableIdent);

Reply via email to