This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit ada85925fc10eddf611967c9062507993774c28f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 31 16:57:43 2024 +0800

    [core] Introduce Partition to Catalog.listPartitions (#4807)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   8 +-
 .../org/apache/paimon/catalog/CachingCatalog.java  |  16 ++-
 .../java/org/apache/paimon/catalog/Catalog.java    |   6 +-
 .../org/apache/paimon/catalog/CatalogUtils.java    |  32 +++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |   5 +-
 .../apache/paimon/metastore/MetastoreClient.java   |   6 +-
 .../apache/paimon/metastore/PartitionStats.java    |  64 ----------
 .../org/apache/paimon/partition/Partition.java     | 135 +++++++++++++++++++++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |   5 +-
 .../apache/paimon/catalog/CachingCatalogTest.java  |   6 +-
 .../paimon/catalog/TestableCachingCatalog.java     |   4 +-
 .../partition/PartitionStatisticsReporter.java     |   8 +-
 .../sink/partition/AddDonePartitionActionTest.java |   6 +-
 .../partition/PartitionStatisticsReporterTest.java |  16 +--
 .../apache/paimon/hive/HiveMetastoreClient.java    |  25 ++--
 15 files changed, 222 insertions(+), 120 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index d7447c37dd..14457a3698 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,11 +24,11 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -59,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
+import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -193,9 +194,8 @@ public abstract class AbstractCatalog implements Catalog {
     }
 
     @Override
-    public List<PartitionEntry> listPartitions(Identifier identifier)
-            throws TableNotExistException {
-        return 
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
+        return listPartitionsFromFileSystem(getTable(identifier));
     }
 
     protected abstract void createDatabaseImpl(String name, Map<String, 
String> properties);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 34e53f32f2..4796276972 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -19,9 +19,9 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -61,7 +61,7 @@ public class CachingCatalog extends DelegateCatalog {
     @Nullable protected final SegmentsCache<Path> manifestCache;
 
     // partition cache will affect data latency
-    @Nullable protected final Cache<Identifier, List<PartitionEntry>> 
partitionCache;
+    @Nullable protected final Cache<Identifier, List<Partition>> 
partitionCache;
 
     public CachingCatalog(Catalog wrapped) {
         this(
@@ -130,7 +130,7 @@ public class CachingCatalog extends DelegateCatalog {
                                 .executor(Runnable::run)
                                 .expireAfterAccess(expirationInterval)
                                 .weigher(
-                                        (Weigher<Identifier, 
List<PartitionEntry>>)
+                                        (Weigher<Identifier, List<Partition>>)
                                                 (identifier, v) -> v.size())
                                 .maximumWeight(cachedPartitionMaxNum)
                                 .ticker(ticker)
@@ -281,13 +281,12 @@ public class CachingCatalog extends DelegateCatalog {
     }
 
     @Override
-    public List<PartitionEntry> listPartitions(Identifier identifier)
-            throws TableNotExistException {
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         if (partitionCache == null) {
             return wrapped.listPartitions(identifier);
         }
 
-        List<PartitionEntry> result = partitionCache.getIfPresent(identifier);
+        List<Partition> result = partitionCache.getIfPresent(identifier);
         if (result == null) {
             result = wrapped.listPartitions(identifier);
             partitionCache.put(identifier, result);
@@ -321,7 +320,7 @@ public class CachingCatalog extends DelegateCatalog {
      */
     public void refreshPartitions(Identifier identifier) throws 
TableNotExistException {
         if (partitionCache != null) {
-            List<PartitionEntry> result = wrapped.listPartitions(identifier);
+            List<Partition> result = wrapped.listPartitions(identifier);
             partitionCache.put(identifier, result);
         }
     }
@@ -341,8 +340,7 @@ public class CachingCatalog extends DelegateCatalog {
         }
         long partitionCacheSize = 0L;
         if (partitionCache != null) {
-            for (Map.Entry<Identifier, List<PartitionEntry>> entry :
-                    partitionCache.asMap().entrySet()) {
+            for (Map.Entry<Identifier, List<Partition>> entry : 
partitionCache.asMap().entrySet()) {
                 partitionCacheSize += entry.getValue().size();
             }
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 37ea6fa5e2..70daba4186 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,7 +20,7 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -255,12 +255,12 @@ public interface Catalog extends AutoCloseable {
             throws TableNotExistException, PartitionNotExistException;
 
     /**
-     * Get PartitionEntry of all partitions of the table.
+     * Get Partition of all partitions of the table.
      *
      * @param identifier path of the table to list partitions
      * @throws TableNotExistException if the table does not exist
      */
-    List<PartitionEntry> listPartitions(Identifier identifier) throws 
TableNotExistException;
+    List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException;
 
     /**
      * Modify an existing table from a {@link SchemaChange}.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 043da0504d..301c7136e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -19,10 +19,19 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
+import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 
@@ -60,4 +69,27 @@ public class CatalogUtils {
     public static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
         return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
     }
+
+    public static List<Partition> listPartitionsFromFileSystem(Table table) {
+        Options options = Options.fromMap(table.options());
+        InternalRowPartitionComputer computer =
+                new InternalRowPartitionComputer(
+                        options.get(PARTITION_DEFAULT_NAME),
+                        table.rowType(),
+                        table.partitionKeys().toArray(new String[0]),
+                        options.get(PARTITION_GENERATE_LEGCY_NAME));
+        List<PartitionEntry> partitionEntries =
+                table.newReadBuilder().newScan().listPartitionEntries();
+        List<Partition> partitions = new ArrayList<>(partitionEntries.size());
+        for (PartitionEntry entry : partitionEntries) {
+            partitions.add(
+                    new Partition(
+                            computer.generatePartValues(entry.partition()),
+                            entry.recordCount(),
+                            entry.fileSizeInBytes(),
+                            entry.fileCount(),
+                            entry.lastFileCreationTime()));
+        }
+        return partitions;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 968f00cfca..e2d1a94cfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -165,8 +165,7 @@ public class DelegateCatalog implements Catalog {
     }
 
     @Override
-    public List<PartitionEntry> listPartitions(Identifier identifier)
-            throws TableNotExistException {
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         return wrapped.listPartitions(identifier);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index ccf5f38538..f24049eca9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.metastore;
 
+import org.apache.paimon.partition.Partition;
+
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -38,9 +40,7 @@ public interface MetastoreClient extends AutoCloseable {
 
     void markPartitionDone(LinkedHashMap<String, String> partition) throws 
Exception;
 
-    default void alterPartition(
-            LinkedHashMap<String, String> partition, PartitionStats 
partitionStats)
-            throws Exception {
+    default void alterPartition(Partition partition) throws Exception {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
deleted file mode 100644
index eacc400f52..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.metastore;
-
-/** Statistic for partition. */
-public interface PartitionStats {
-
-    long numFiles();
-
-    long totalSize();
-
-    long numRows();
-
-    long lastUpdateTimeMillis();
-
-    static PartitionStats create(
-            long numFiles, long totalSize, long numRows, long 
lastUpdateTimeMillis) {
-        return new PartitionStats() {
-
-            @Override
-            public long numFiles() {
-                return numFiles;
-            }
-
-            @Override
-            public long totalSize() {
-                return totalSize;
-            }
-
-            @Override
-            public long numRows() {
-                return numRows;
-            }
-
-            @Override
-            public long lastUpdateTimeMillis() {
-                return lastUpdateTimeMillis;
-            }
-
-            @Override
-            public String toString() {
-                return String.format(
-                        "numFiles: %s, totalSize: %s, numRows: %s, 
lastUpdateTimeMillis: %s",
-                        numFiles, totalSize, numRows, lastUpdateTimeMillis);
-            }
-        };
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
new file mode 100644
index 0000000000..b13082fb44
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.annotation.Public;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** Entry representing a partition. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Public
+public class Partition implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_SPEC = "spec";
+    public static final String FIELD_RECORD_COUNT = "recordCount";
+    public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
+    public static final String FIELD_FILE_COUNT = "fileCount";
+    public static final String FIELD_LAST_FILE_CREATION_TIME = 
"lastFileCreationTime";
+
+    @JsonProperty(FIELD_SPEC)
+    private final Map<String, String> spec;
+
+    @JsonProperty(FIELD_RECORD_COUNT)
+    private final long recordCount;
+
+    @JsonProperty(FIELD_FILE_SIZE_IN_BYTES)
+    private final long fileSizeInBytes;
+
+    @JsonProperty(FIELD_FILE_COUNT)
+    private final long fileCount;
+
+    @JsonProperty(FIELD_LAST_FILE_CREATION_TIME)
+    private final long lastFileCreationTime;
+
+    @JsonCreator
+    public Partition(
+            @JsonProperty(FIELD_SPEC) Map<String, String> spec,
+            @JsonProperty(FIELD_RECORD_COUNT) long recordCount,
+            @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
+            @JsonProperty(FIELD_FILE_COUNT) long fileCount,
+            @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long 
lastFileCreationTime) {
+        this.spec = spec;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+        this.fileCount = fileCount;
+        this.lastFileCreationTime = lastFileCreationTime;
+    }
+
+    @JsonGetter(FIELD_SPEC)
+    public Map<String, String> spec() {
+        return spec;
+    }
+
+    @JsonGetter(FIELD_RECORD_COUNT)
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @JsonGetter(FIELD_FILE_SIZE_IN_BYTES)
+    public long fileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    @JsonGetter(FIELD_FILE_COUNT)
+    public long fileCount() {
+        return fileCount;
+    }
+
+    @JsonGetter(FIELD_LAST_FILE_CREATION_TIME)
+    public long lastFileCreationTime() {
+        return lastFileCreationTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Partition that = (Partition) o;
+        return recordCount == that.recordCount
+                && fileSizeInBytes == that.fileSizeInBytes
+                && fileCount == that.fileCount
+                && lastFileCreationTime == that.lastFileCreationTime
+                && Objects.equals(spec, that.spec);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(spec, recordCount, fileSizeInBytes, fileCount, 
lastFileCreationTime);
+    }
+
+    @Override
+    public String toString() {
+        return "{"
+                + "spec="
+                + spec
+                + ", recordCount="
+                + recordCount
+                + ", fileSizeInBytes="
+                + fileSizeInBytes
+                + ", fileCount="
+                + fileCount
+                + ", lastFileCreationTime="
+                + lastFileCreationTime
+                + '}';
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 8b53bef848..b604815209 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -23,9 +23,9 @@ import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.auth.AuthSession;
 import org.apache.paimon.rest.auth.CredentialsProvider;
 import org.apache.paimon.rest.auth.CredentialsProviderFactory;
@@ -251,8 +251,7 @@ public class RESTCatalog implements Catalog {
             throws TableNotExistException, PartitionNotExistException {}
 
     @Override
-    public List<PartitionEntry> listPartitions(Identifier identifier)
-            throws TableNotExistException {
+    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index fee6d14331..c028fa7421 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -21,9 +21,9 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -245,12 +245,12 @@ class CachingCatalogTest extends CatalogTestBase {
                         Collections.emptyMap(),
                         "");
         catalog.createTable(tableIdent, schema, false);
-        List<PartitionEntry> partitionEntryList = 
catalog.listPartitions(tableIdent);
+        List<Partition> partitionEntryList = 
catalog.listPartitions(tableIdent);
         assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
         catalog.invalidateTable(tableIdent);
         catalog.refreshPartitions(tableIdent);
         assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
-        List<PartitionEntry> partitionEntryListFromCache =
+        List<Partition> partitionEntryListFromCache =
                 catalog.partitionCache().getIfPresent(tableIdent);
         assertThat(partitionEntryListFromCache).isNotNull();
         
assertThat(partitionEntryListFromCache).containsAll(partitionEntryList);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 1d4a9b0e8a..0eaf23a1a2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.table.Table;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -56,7 +56,7 @@ public class TestableCachingCatalog extends CachingCatalog {
         return tableCache;
     }
 
-    public Cache<Identifier, List<PartitionEntry>> partitionCache() {
+    public Cache<Identifier, List<Partition>> partitionCache() {
         partitionCache.cleanUp();
         return partitionCache;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index ced37726f1..84542af476 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -22,7 +22,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.metastore.PartitionStats;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -82,10 +82,10 @@ public class PartitionStatisticsReporter implements 
Closeable {
                 }
             }
 
-            PartitionStats partitionStats =
-                    PartitionStats.create(fileCount, totalSize, rowCount, 
modifyTimeMillis);
+            Partition partitionStats =
+                    new Partition(partitionSpec, fileCount, totalSize, 
rowCount, modifyTimeMillis);
             LOG.info("alter partition {} with statistic {}.", partitionSpec, 
partitionStats);
-            metastoreClient.alterPartition(partitionSpec, partitionStats);
+            metastoreClient.alterPartition(partitionStats);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index 3bdbdd20ad..3c5cd2f8e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.metastore.PartitionStats;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.actions.AddDonePartitionAction;
 
 import org.junit.jupiter.api.Test;
@@ -68,9 +68,7 @@ class AddDonePartitionActionTest {
                     }
 
                     @Override
-                    public void alterPartition(
-                            LinkedHashMap<String, String> partitionSpec,
-                            PartitionStats partitionStats) {
+                    public void alterPartition(Partition partition) {
                         throw new UnsupportedOperationException();
                     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
index 0f761efa22..3c01772d6d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.metastore.PartitionStats;
+import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
@@ -85,7 +85,7 @@ public class PartitionStatisticsReporterTest {
         BatchTableCommit committer = table.newBatchWriteBuilder().newCommit();
         committer.commit(messages);
         AtomicBoolean closed = new AtomicBoolean(false);
-        Map<String, PartitionStats> partitionParams = Maps.newHashMap();
+        Map<String, Partition> partitionParams = Maps.newHashMap();
 
         MetastoreClient client =
                 new MetastoreClient() {
@@ -116,12 +116,12 @@ public class PartitionStatisticsReporterTest {
                     }
 
                     @Override
-                    public void alterPartition(
-                            LinkedHashMap<String, String> partitionSpec,
-                            PartitionStats partitionStats) {
+                    public void alterPartition(Partition partition) {
                         partitionParams.put(
-                                
PartitionPathUtils.generatePartitionPath(partitionSpec),
-                                partitionStats);
+                                PartitionPathUtils.generatePartitionPath(
+                                        partition.spec(),
+                                        
table.rowType().project(table.partitionKeys())),
+                                partition);
                     }
 
                     @Override
@@ -136,7 +136,7 @@ public class PartitionStatisticsReporterTest {
         Assertions.assertThat(partitionParams).containsKey("c1=a/");
         Assertions.assertThat(partitionParams.get("c1=a/").toString())
                 .isEqualTo(
-                        "numFiles: 1, totalSize: 591, numRows: 1, 
lastUpdateTimeMillis: 1729598544974");
+                        "{spec={c1=a}, recordCount=1, fileSizeInBytes=591, 
fileCount=1, lastFileCreationTime=1729598544974}");
         action.close();
         Assertions.assertThat(closed).isTrue();
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 0661988648..755b2df206 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -23,13 +23,13 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
 import org.apache.paimon.hive.pool.CachedClientPool;
 import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.metastore.PartitionStats;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.PartitionPathUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
@@ -57,6 +57,7 @@ public class HiveMetastoreClient implements MetastoreClient {
     private final Identifier identifier;
 
     private final ClientPool<IMetaStoreClient, TException> clients;
+    private final List<String> partitionKeys;
     private final StorageDescriptor sd;
     private final String dataFilePath;
 
@@ -69,6 +70,10 @@ public class HiveMetastoreClient implements MetastoreClient {
                         client ->
                                 client.getTable(
                                         identifier.getDatabaseName(), 
identifier.getTableName()));
+        this.partitionKeys =
+                table.getPartitionKeys().stream()
+                        .map(FieldSchema::getName)
+                        .collect(Collectors.toList());
         this.sd = table.getSd();
         this.dataFilePath =
                 
table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key())
@@ -103,17 +108,17 @@ public class HiveMetastoreClient implements 
MetastoreClient {
     }
 
     @Override
-    public void alterPartition(
-            LinkedHashMap<String, String> partition, PartitionStats 
partitionStats)
-            throws Exception {
-        List<String> partitionValues = new ArrayList<>(partition.values());
+    public void alterPartition(org.apache.paimon.partition.Partition 
partition) throws Exception {
+        Map<String, String> spec = partition.spec();
+        List<String> partitionValues =
+                
partitionKeys.stream().map(spec::get).collect(Collectors.toList());
 
         Map<String, String> statistic = new HashMap<>();
-        statistic.put(NUM_FILES_PROP, 
String.valueOf(partitionStats.numFiles()));
-        statistic.put(TOTAL_SIZE_PROP, 
String.valueOf(partitionStats.totalSize()));
-        statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows()));
+        statistic.put(NUM_FILES_PROP, String.valueOf(partition.fileCount()));
+        statistic.put(TOTAL_SIZE_PROP, 
String.valueOf(partition.fileSizeInBytes()));
+        statistic.put(NUM_ROWS_PROP, String.valueOf(partition.recordCount()));
 
-        String modifyTimeSeconds = 
String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000);
+        String modifyTimeSeconds = 
String.valueOf(partition.lastFileCreationTime() / 1000);
         statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
 
         // just for being compatible with hive metastore
@@ -128,7 +133,7 @@ public class HiveMetastoreClient implements MetastoreClient 
{
                                             identifier.getObjectName(),
                                             partitionValues));
             hivePartition.setValues(partitionValues);
-            hivePartition.setLastAccessTime((int) 
(partitionStats.lastUpdateTimeMillis() / 1000));
+            hivePartition.setLastAccessTime((int) 
(partition.lastFileCreationTime() / 1000));
             hivePartition.getParameters().putAll(statistic);
             clients.execute(
                     client ->

Reply via email to