This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 659cf39957 [core] Refactor MetastoreClient methods to simplify catalog 
(#4726)
659cf39957 is described below

commit 659cf3995752432aa939569ee244e548f70ec77e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 17 13:54:31 2024 +0800

    [core] Refactor MetastoreClient methods to simplify catalog (#4726)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   5 +-
 .../metastore/AddPartitionCommitCallback.java      |  11 +-
 .../paimon/metastore/AddPartitionTagCallback.java  |   2 +-
 .../apache/paimon/metastore/MetastoreClient.java   |  29 ++---
 .../apache/paimon/metastore/PartitionStats.java    |  64 +++++++++++
 .../apache/paimon/operation/PartitionExpire.java   |  16 ++-
 .../actions/MarkPartitionDoneEventAction.java      |   2 +-
 .../paimon/table/AbstractFileStoreTable.java       |  11 +-
 .../paimon/operation/PartitionExpireTest.java      |  65 +++++++++++-
 .../partition/PartitionStatisticsReporter.java     |  27 ++---
 .../sink/partition/AddDonePartitionActionTest.java |  30 +++---
 .../partition/PartitionStatisticsReporterTest.java |  45 +++-----
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  31 +++---
 .../apache/paimon/hive/HiveMetastoreClient.java    | 118 +++++++++------------
 .../paimon/spark/PaimonPartitionManagement.scala   |   2 +-
 15 files changed, 267 insertions(+), 191 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index db69092955..3fdefe6cac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -417,7 +417,7 @@ public abstract class AbstractCatalog implements Catalog {
                                         lockFactory().orElse(null),
                                         lockContext().orElse(null),
                                         identifier),
-                                metastoreClientFactory(identifier, 
tableMeta.schema).orElse(null)));
+                                
metastoreClientFactory(identifier).orElse(null)));
         CoreOptions options = table.coreOptions();
         if (options.type() == TableType.OBJECT_TABLE) {
             String objectLocation = options.objectLocation();
@@ -485,8 +485,7 @@ public abstract class AbstractCatalog implements Catalog {
             throws TableNotExistException;
 
     /** Get metastore client factory for the table specified by {@code 
identifier}. */
-    public Optional<MetastoreClient.Factory> metastoreClientFactory(
-            Identifier identifier, TableSchema schema) {
+    public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier) {
         return Optional.empty();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 599f88e512..26fb9ed48d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
 import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
@@ -48,9 +49,12 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
                     .build();
 
     private final MetastoreClient client;
+    private final InternalRowPartitionComputer partitionComputer;
 
-    public AddPartitionCommitCallback(MetastoreClient client) {
+    public AddPartitionCommitCallback(
+            MetastoreClient client, InternalRowPartitionComputer 
partitionComputer) {
         this.client = client;
+        this.partitionComputer = partitionComputer;
     }
 
     @Override
@@ -81,7 +85,10 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
                 }
             }
             if (!newPartitions.isEmpty()) {
-                client.addPartitions(newPartitions);
+                client.addPartitions(
+                        newPartitions.stream()
+                                .map(partitionComputer::generatePartValues)
+                                .collect(Collectors.toList()));
                 newPartitions.forEach(partition -> cache.put(partition, true));
             }
         } catch (Exception e) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
index 70efe68e83..31bb521e88 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -49,7 +49,7 @@ public class AddPartitionTagCallback implements TagCallback {
         LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
         partitionSpec.put(partitionField, tagName);
         try {
-            client.deletePartition(partitionSpec);
+            client.dropPartition(partitionSpec);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 75f7af5abb..ccf5f38538 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -18,12 +18,9 @@
 
 package org.apache.paimon.metastore;
 
-import org.apache.paimon.data.BinaryRow;
-
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * A metastore client related to a table. All methods of this interface 
operate on the same specific
@@ -31,32 +28,18 @@ import java.util.Map;
  */
 public interface MetastoreClient extends AutoCloseable {
 
-    void addPartition(BinaryRow partition) throws Exception;
-
-    default void addPartitions(List<BinaryRow> partitions) throws Exception {
-        for (BinaryRow partition : partitions) {
-            addPartition(partition);
-        }
-    }
+    void addPartition(LinkedHashMap<String, String> partition) throws 
Exception;
 
-    void addPartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
+    void addPartitions(List<LinkedHashMap<String, String>> partitions) throws 
Exception;
 
-    default void addPartitionsSpec(List<LinkedHashMap<String, String>> 
partitionSpecsList)
-            throws Exception {
-        for (LinkedHashMap<String, String> partitionSpecs : 
partitionSpecsList) {
-            addPartition(partitionSpecs);
-        }
-    }
+    void dropPartition(LinkedHashMap<String, String> partition) throws 
Exception;
 
-    void deletePartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
+    void dropPartitions(List<LinkedHashMap<String, String>> partitions) throws 
Exception;
 
-    void markDone(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
+    void markPartitionDone(LinkedHashMap<String, String> partition) throws 
Exception;
 
     default void alterPartition(
-            LinkedHashMap<String, String> partitionSpec,
-            Map<String, String> parameters,
-            long modifyTime,
-            boolean ignoreIfNotExist)
+            LinkedHashMap<String, String> partition, PartitionStats 
partitionStats)
             throws Exception {
         throw new UnsupportedOperationException();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
new file mode 100644
index 0000000000..eacc400f52
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+/** Statistic for partition. */
+public interface PartitionStats {
+
+    long numFiles();
+
+    long totalSize();
+
+    long numRows();
+
+    long lastUpdateTimeMillis();
+
+    static PartitionStats create(
+            long numFiles, long totalSize, long numRows, long 
lastUpdateTimeMillis) {
+        return new PartitionStats() {
+
+            @Override
+            public long numFiles() {
+                return numFiles;
+            }
+
+            @Override
+            public long totalSize() {
+                return totalSize;
+            }
+
+            @Override
+            public long numRows() {
+                return numRows;
+            }
+
+            @Override
+            public long lastUpdateTimeMillis() {
+                return lastUpdateTimeMillis;
+            }
+
+            @Override
+            public String toString() {
+                return String.format(
+                        "numFiles: %s, totalSize: %s, numRows: %s, 
lastUpdateTimeMillis: %s",
+                        numFiles, totalSize, numRows, lastUpdateTimeMillis);
+            }
+        };
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index d432a37dfd..68ef8a1237 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -167,15 +167,13 @@ public class PartitionExpire {
     }
 
     private void deleteMetastorePartitions(List<Map<String, String>> 
partitions) {
-        if (metastoreClient != null) {
-            partitions.forEach(
-                    partition -> {
-                        try {
-                            metastoreClient.deletePartition(new 
LinkedHashMap<>(partition));
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
+        if (metastoreClient != null && partitions.size() > 0) {
+            try {
+                metastoreClient.dropPartitions(
+                        
partitions.stream().map(LinkedHashMap::new).collect(Collectors.toList()));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index a5ebe34051..8cc1c93ba9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -39,7 +39,7 @@ public class MarkPartitionDoneEventAction implements 
PartitionMarkDoneAction {
     public void markDone(String partition) throws Exception {
         LinkedHashMap<String, String> partitionSpec =
                 extractPartitionSpecFromPath(new Path(partition));
-        metastoreClient.markDone(partitionSpec);
+        metastoreClient.markPartitionDone(partitionSpec);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 57966d24ce..7e008698c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -61,6 +61,7 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanne
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SimpleFileReader;
@@ -469,7 +470,15 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         if (options.partitionedTableInMetastore()
                 && metastoreClientFactory != null
                 && !tableSchema.partitionKeys().isEmpty()) {
-            callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
+            InternalRowPartitionComputer partitionComputer =
+                    new InternalRowPartitionComputer(
+                            options.partitionDefaultName(),
+                            tableSchema.logicalPartitionType(),
+                            tableSchema.partitionKeys().toArray(new String[0]),
+                            options.legacyPartitionName());
+            callbacks.add(
+                    new AddPartitionCommitCallback(
+                            metastoreClientFactory.create(), 
partitionComputer));
         }
 
         TagPreview tagPreview = TagPreview.create(options);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 931bac59c7..893fe1bf57 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -27,8 +27,12 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -54,6 +58,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -61,9 +66,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static 
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
 import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
 import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
+import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -75,12 +82,54 @@ public class PartitionExpireTest {
 
     private Path path;
     private FileStoreTable table;
+    private List<LinkedHashMap<String, String>> deletedPartitions;
 
     @BeforeEach
     public void beforeEach() {
         path = new Path(tempDir.toUri());
     }
 
+    private void newTable() {
+        LocalFileIO fileIO = LocalFileIO.create();
+        Options options = new Options();
+        options.set(PATH, path.toString());
+        Path tablePath = CoreOptions.path(options);
+        String branchName = CoreOptions.branch(options.toMap());
+        TableSchema tableSchema = new SchemaManager(fileIO, tablePath, 
branchName).latest().get();
+        deletedPartitions = new ArrayList<>();
+        MetastoreClient.Factory factory =
+                () ->
+                        new MetastoreClient() {
+                            @Override
+                            public void addPartition(LinkedHashMap<String, 
String> partition) {}
+
+                            @Override
+                            public void addPartitions(
+                                    List<LinkedHashMap<String, String>> 
partitions) {}
+
+                            @Override
+                            public void dropPartition(LinkedHashMap<String, 
String> partition) {
+                                deletedPartitions.add(partition);
+                            }
+
+                            @Override
+                            public void dropPartitions(
+                                    List<LinkedHashMap<String, String>> 
partitions) {
+                                deletedPartitions.addAll(partitions);
+                            }
+
+                            @Override
+                            public void 
markPartitionDone(LinkedHashMap<String, String> partition) {
+                                throw new UnsupportedOperationException();
+                            }
+
+                            @Override
+                            public void close() {}
+                        };
+        CatalogEnvironment env = new CatalogEnvironment(null, null, 
Lock.emptyFactory(), factory);
+        table = FileStoreTableFactory.create(fileIO, path, tableSchema, env);
+    }
+
     @Test
     public void testNonPartitionedTable() {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
@@ -108,7 +157,7 @@ public class PartitionExpireTest {
                         emptyList(),
                         Collections.emptyMap(),
                         ""));
-        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        newTable();
         write("20230101", "11");
         write("abcd", "12");
         write("20230101", "12");
@@ -129,9 +178,9 @@ public class PartitionExpireTest {
                         RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
                         singletonList("f0"),
                         emptyList(),
-                        Collections.emptyMap(),
+                        
Collections.singletonMap(METASTORE_PARTITIONED_TABLE.key(), "true"),
                         ""));
-        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        newTable();
 
         write("20230101", "11");
         write("20230101", "12");
@@ -156,6 +205,12 @@ public class PartitionExpireTest {
 
         expire.expire(date(8), Long.MAX_VALUE);
         assertThat(read()).isEmpty();
+
+        assertThat(deletedPartitions)
+                .containsExactlyInAnyOrder(
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230101")),
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230103")),
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230105")));
     }
 
     @Test
@@ -169,7 +224,7 @@ public class PartitionExpireTest {
                         Collections.emptyMap(),
                         ""));
 
-        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        newTable();
         // disable compaction and snapshot expiration
         table = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"));
         String commitUser = UUID.randomUUID().toString();
@@ -243,7 +298,7 @@ public class PartitionExpireTest {
                         emptyList(),
                         Collections.emptyMap(),
                         ""));
-        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        newTable();
         table = newExpireTable();
 
         List<CommitMessage> commitMessages = write("20230101", "11");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
index b75889d567..ced37726f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -35,15 +36,9 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
-import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
-import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
-import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
-import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
 import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
 
 /** Action to report the table statistic from the latest snapshot to HMS. */
@@ -51,8 +46,6 @@ public class PartitionStatisticsReporter implements Closeable 
{
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PartitionStatisticsReporter.class);
 
-    private static final String HIVE_LAST_UPDATE_TIME_PROP = 
"transient_lastDdlTime";
-
     private final MetastoreClient metastoreClient;
     private final SnapshotReader snapshotReader;
     private final SnapshotManager snapshotManager;
@@ -64,7 +57,7 @@ public class PartitionStatisticsReporter implements Closeable 
{
         this.snapshotManager = table.snapshotManager();
     }
 
-    public void report(String partition, long modifyTime) throws Exception {
+    public void report(String partition, long modifyTimeMillis) throws 
Exception {
         Snapshot snapshot = snapshotManager.latestSnapshot();
         if (snapshot != null) {
             LinkedHashMap<String, String> partitionSpec =
@@ -88,19 +81,11 @@ public class PartitionStatisticsReporter implements 
Closeable {
                     totalSize += fileMeta.fileSize();
                 }
             }
-            Map<String, String> statistic = new HashMap<>();
-            statistic.put(NUM_FILES_PROP, String.valueOf(fileCount));
-            statistic.put(TOTAL_SIZE_PROP, String.valueOf(totalSize));
-            statistic.put(NUM_ROWS_PROP, String.valueOf(rowCount));
-
-            String modifyTimeSeconds = String.valueOf(modifyTime / 1000);
-            statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
-
-            // just for being compatible with hive metastore
-            statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
 
-            LOG.info("alter partition {} with statistic {}.", partitionSpec, 
statistic);
-            metastoreClient.alterPartition(partitionSpec, statistic, 
modifyTime, true);
+            PartitionStats partitionStats =
+                    PartitionStats.create(fileCount, totalSize, rowCount, 
modifyTimeMillis);
+            LOG.info("alter partition {} with statistic {}.", partitionSpec, 
partitionStats);
+            metastoreClient.alterPartition(partitionSpec, partitionStats);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index fca5dcf0ed..3bdbdd20ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -18,15 +18,15 @@
 
 package org.apache.paimon.flink.sink.partition;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
 import org.apache.paimon.partition.actions.AddDonePartitionAction;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -41,39 +41,41 @@ class AddDonePartitionActionTest {
         Set<String> donePartitions = new HashSet<>();
         MetastoreClient metastoreClient =
                 new MetastoreClient() {
+
                     @Override
-                    public void addPartition(BinaryRow partition) {
-                        throw new UnsupportedOperationException();
+                    public void addPartition(LinkedHashMap<String, String> 
partition) {
+                        donePartitions.add(generatePartitionPath(partition));
+                    }
+
+                    @Override
+                    public void addPartitions(List<LinkedHashMap<String, 
String>> partitions) {
+                        partitions.forEach(this::addPartition);
                     }
 
                     @Override
-                    public void addPartition(LinkedHashMap<String, String> 
partitionSpec) {
-                        
donePartitions.add(generatePartitionPath(partitionSpec));
+                    public void dropPartition(LinkedHashMap<String, String> 
partition) {
+                        throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void deletePartition(LinkedHashMap<String, String> 
partitionSpec) {
+                    public void dropPartitions(List<LinkedHashMap<String, 
String>> partitions) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void markDone(LinkedHashMap<String, String> 
partitionSpec)
-                            throws Exception {
+                    public void markPartitionDone(LinkedHashMap<String, 
String> partitions) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
                     public void alterPartition(
                             LinkedHashMap<String, String> partitionSpec,
-                            Map<String, String> parameters,
-                            long modifyTime,
-                            boolean ignoreIfNotExist)
-                            throws Exception {
+                            PartitionStats partitionStats) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void close() throws Exception {
+                    public void close() {
                         closed.set(true);
                     }
                 };
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
index 142a0c32f7..0f761efa22 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.paimon.flink.sink.partition;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
@@ -35,7 +35,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.PartitionPathUtils;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
 
@@ -86,47 +85,47 @@ public class PartitionStatisticsReporterTest {
         BatchTableCommit committer = table.newBatchWriteBuilder().newCommit();
         committer.commit(messages);
         AtomicBoolean closed = new AtomicBoolean(false);
-        Map<String, Map<String, String>> partitionParams = Maps.newHashMap();
+        Map<String, PartitionStats> partitionParams = Maps.newHashMap();
 
         MetastoreClient client =
                 new MetastoreClient() {
+
+                    @Override
+                    public void addPartition(LinkedHashMap<String, String> 
partition) {
+                        throw new UnsupportedOperationException();
+                    }
+
                     @Override
-                    public void addPartition(BinaryRow partition) throws 
Exception {
+                    public void addPartitions(List<LinkedHashMap<String, 
String>> partitions) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void addPartition(LinkedHashMap<String, String> 
partitionSpec)
-                            throws Exception {
+                    public void dropPartition(LinkedHashMap<String, String> 
partition) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void deletePartition(LinkedHashMap<String, String> 
partitionSpec)
-                            throws Exception {
+                    public void dropPartitions(List<LinkedHashMap<String, 
String>> partitions) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    public void markDone(LinkedHashMap<String, String> 
partitionSpec)
-                            throws Exception {
+                    public void markPartitionDone(LinkedHashMap<String, 
String> partitionSpec) {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
                     public void alterPartition(
                             LinkedHashMap<String, String> partitionSpec,
-                            Map<String, String> parameters,
-                            long modifyTime,
-                            boolean ignoreIfNotExist)
-                            throws Exception {
+                            PartitionStats partitionStats) {
                         partitionParams.put(
                                 
PartitionPathUtils.generatePartitionPath(partitionSpec),
-                                parameters);
+                                partitionStats);
                     }
 
                     @Override
-                    public void close() throws Exception {
+                    public void close() {
                         closed.set(true);
                     }
                 };
@@ -135,19 +134,9 @@ public class PartitionStatisticsReporterTest {
         long time = 1729598544974L;
         action.report("c1=a/", time);
         Assertions.assertThat(partitionParams).containsKey("c1=a/");
-        Assertions.assertThat(partitionParams.get("c1=a/"))
+        Assertions.assertThat(partitionParams.get("c1=a/").toString())
                 .isEqualTo(
-                        ImmutableMap.of(
-                                "numFiles",
-                                "1",
-                                "totalSize",
-                                "591",
-                                "numRows",
-                                "1",
-                                "lastUpdateTime",
-                                String.valueOf(time / 1000),
-                                "transient_lastDdlTime",
-                                String.valueOf(time / 1000)));
+                        "numFiles: 1, totalSize: 591, numRows: 1, 
lastUpdateTimeMillis: 1729598544974");
         action.close();
         Assertions.assertThat(closed).isTrue();
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 5744ac894d..f5ae504850 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -27,7 +27,6 @@ import org.apache.paimon.catalog.CatalogLockContext;
 import org.apache.paimon.catalog.CatalogLockFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.pool.CachedClientPool;
@@ -48,6 +47,7 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.view.View;
@@ -191,13 +191,12 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<MetastoreClient.Factory> metastoreClientFactory(
-            Identifier identifier, TableSchema schema) {
+    public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier) {
         Identifier tableIdentifier =
                 new Identifier(identifier.getDatabaseName(), 
identifier.getTableName());
         return Optional.of(
                 new HiveMetastoreClient.Factory(
-                        tableIdentifier, schema, hiveConf, clientClassName, 
options));
+                        tableIdentifier, hiveConf, clientClassName, options));
     }
 
     @Override
@@ -350,9 +349,8 @@ public class HiveCatalog extends AbstractCatalog {
                         new HiveMetastoreClient(
                                 new Identifier(
                                         identifier.getDatabaseName(), 
identifier.getTableName()),
-                                tableSchema,
                                 clients);
-                metastoreClient.deletePartition(new 
LinkedHashMap<>(partitionSpec));
+                metastoreClient.dropPartition(new 
LinkedHashMap<>(partitionSpec));
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
@@ -610,7 +608,7 @@ public class HiveCatalog extends AbstractCatalog {
                                     lockFactory().orElse(null),
                                     lockContext().orElse(null),
                                     identifier),
-                            metastoreClientFactory(identifier, 
tableMeta.schema()).orElse(null)));
+                            metastoreClientFactory(identifier).orElse(null)));
         } catch (TableNotExistException ignore) {
         }
 
@@ -968,14 +966,19 @@ public class HiveCatalog extends AbstractCatalog {
             // repair partitions
             if (!tableSchema.partitionKeys().isEmpty() && 
!newTable.getPartitionKeys().isEmpty()) {
                 // Do not close client, it is for HiveCatalog
+                CoreOptions options = new CoreOptions(tableSchema.options());
+                InternalRowPartitionComputer partitionComputer =
+                        new InternalRowPartitionComputer(
+                                options.partitionDefaultName(),
+                                tableSchema.logicalPartitionType(),
+                                tableSchema.partitionKeys().toArray(new 
String[0]),
+                                options.legacyPartitionName());
                 @SuppressWarnings("resource")
-                HiveMetastoreClient metastoreClient =
-                        new HiveMetastoreClient(identifier, tableSchema, 
clients);
-                List<BinaryRow> partitions =
-                        
getTable(identifier).newReadBuilder().newScan().listPartitions();
-                for (BinaryRow partition : partitions) {
-                    metastoreClient.addPartition(partition);
-                }
+                HiveMetastoreClient metastoreClient = new 
HiveMetastoreClient(identifier, clients);
+                metastoreClient.addPartitions(
+                        
getTable(identifier).newReadBuilder().newScan().listPartitions().stream()
+                                .map(partitionComputer::generatePartValues)
+                                .collect(Collectors.toList()));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 885fa463e5..f7be538c25 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -18,15 +18,12 @@
 
 package org.apache.paimon.hive;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.client.ClientPool;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.hive.pool.CachedClientPool;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.metastore.PartitionStats;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.PartitionPathUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -39,34 +36,30 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.thrift.TException;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
+import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
+import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
+import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
+
 /** {@link MetastoreClient} for Hive tables. */
 public class HiveMetastoreClient implements MetastoreClient {
 
+    private static final String HIVE_LAST_UPDATE_TIME_PROP = 
"transient_lastDdlTime";
+
     private final Identifier identifier;
-    private final InternalRowPartitionComputer partitionComputer;
 
     private final ClientPool<IMetaStoreClient, TException> clients;
     private final StorageDescriptor sd;
 
-    HiveMetastoreClient(
-            Identifier identifier,
-            TableSchema schema,
-            ClientPool<IMetaStoreClient, TException> clients)
+    HiveMetastoreClient(Identifier identifier, ClientPool<IMetaStoreClient, 
TException> clients)
             throws TException, InterruptedException {
         this.identifier = identifier;
-        CoreOptions options = new CoreOptions(schema.options());
-        this.partitionComputer =
-                new InternalRowPartitionComputer(
-                        options.partitionDefaultName(),
-                        schema.logicalPartitionType(),
-                        schema.partitionKeys().toArray(new String[0]),
-                        options.legacyPartitionName());
-
         this.clients = clients;
         this.sd =
                 this.clients
@@ -79,22 +72,9 @@ public class HiveMetastoreClient implements MetastoreClient {
     }
 
     @Override
-    public void addPartition(BinaryRow partition) throws Exception {
-        addPartition(partitionComputer.generatePartValues(partition));
-    }
-
-    @Override
-    public void addPartitions(List<BinaryRow> partitions) throws Exception {
-        addPartitionsSpec(
-                partitions.stream()
-                        .map(partitionComputer::generatePartValues)
-                        .collect(Collectors.toList()));
-    }
-
-    @Override
-    public void addPartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
+    public void addPartition(LinkedHashMap<String, String> partition) throws 
Exception {
         Partition hivePartition =
-                toHivePartition(partitionSpec, (int) 
(System.currentTimeMillis() / 1000));
+                toHivePartition(partition, (int) (System.currentTimeMillis() / 
1000));
         clients.execute(
                 client -> {
                     try {
@@ -105,11 +85,10 @@ public class HiveMetastoreClient implements 
MetastoreClient {
     }
 
     @Override
-    public void addPartitionsSpec(List<LinkedHashMap<String, String>> 
partitionSpecsList)
-            throws Exception {
+    public void addPartitions(List<LinkedHashMap<String, String>> partitions) 
throws Exception {
         int currentTime = (int) (System.currentTimeMillis() / 1000);
         List<Partition> hivePartitions =
-                partitionSpecsList.stream()
+                partitions.stream()
                         .map(partitionSpec -> toHivePartition(partitionSpec, 
currentTime))
                         .collect(Collectors.toList());
         clients.execute(client -> client.add_partitions(hivePartitions, true, 
false));
@@ -117,43 +96,45 @@ public class HiveMetastoreClient implements 
MetastoreClient {
 
     @Override
     public void alterPartition(
-            LinkedHashMap<String, String> partitionSpec,
-            Map<String, String> parameters,
-            long modifyTime,
-            boolean ignoreIfNotExist)
+            LinkedHashMap<String, String> partition, PartitionStats 
partitionStats)
             throws Exception {
-        List<String> partitionValues = new ArrayList<>(partitionSpec.values());
-        int currentTime = (int) (modifyTime / 1000);
-        Partition hivePartition;
+        List<String> partitionValues = new ArrayList<>(partition.values());
+
+        Map<String, String> statistic = new HashMap<>();
+        statistic.put(NUM_FILES_PROP, 
String.valueOf(partitionStats.numFiles()));
+        statistic.put(TOTAL_SIZE_PROP, 
String.valueOf(partitionStats.totalSize()));
+        statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows()));
+
+        String modifyTimeSeconds = 
String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000);
+        statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
+        // just for being compatible with hive metastore
+        statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds);
+
         try {
-            hivePartition =
+            Partition hivePartition =
                     clients.run(
                             client ->
                                     client.getPartition(
                                             identifier.getDatabaseName(),
                                             identifier.getObjectName(),
                                             partitionValues));
+            hivePartition.setValues(partitionValues);
+            hivePartition.setLastAccessTime((int) 
(partitionStats.lastUpdateTimeMillis() / 1000));
+            hivePartition.getParameters().putAll(statistic);
+            clients.execute(
+                    client ->
+                            client.alter_partition(
+                                    identifier.getDatabaseName(),
+                                    identifier.getObjectName(),
+                                    hivePartition));
         } catch (NoSuchObjectException e) {
-            if (ignoreIfNotExist) {
-                return;
-            } else {
-                throw e;
-            }
+            // do nothing if the partition not exists
         }
-
-        hivePartition.setValues(partitionValues);
-        hivePartition.setLastAccessTime(currentTime);
-        hivePartition.getParameters().putAll(parameters);
-        clients.execute(
-                client ->
-                        client.alter_partition(
-                                identifier.getDatabaseName(),
-                                identifier.getObjectName(),
-                                hivePartition));
     }
 
     @Override
-    public void deletePartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
+    public void dropPartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
         List<String> partitionValues = new ArrayList<>(partitionSpec.values());
         try {
             clients.execute(
@@ -169,7 +150,14 @@ public class HiveMetastoreClient implements 
MetastoreClient {
     }
 
     @Override
-    public void markDone(LinkedHashMap<String, String> partitionSpec) throws 
Exception {
+    public void dropPartitions(List<LinkedHashMap<String, String>> partitions) 
throws Exception {
+        for (LinkedHashMap<String, String> partition : partitions) {
+            dropPartition(partition);
+        }
+    }
+
+    @Override
+    public void markPartitionDone(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
         try {
             clients.execute(
                     client ->
@@ -213,19 +201,13 @@ public class HiveMetastoreClient implements 
MetastoreClient {
         private static final long serialVersionUID = 1L;
 
         private final Identifier identifier;
-        private final TableSchema schema;
         private final SerializableHiveConf hiveConf;
         private final String clientClassName;
         private final Options options;
 
         public Factory(
-                Identifier identifier,
-                TableSchema schema,
-                HiveConf hiveConf,
-                String clientClassName,
-                Options options) {
+                Identifier identifier, HiveConf hiveConf, String 
clientClassName, Options options) {
             this.identifier = identifier;
-            this.schema = schema;
             this.hiveConf = new SerializableHiveConf(hiveConf);
             this.clientClassName = clientClassName;
             this.options = options;
@@ -236,7 +218,7 @@ public class HiveMetastoreClient implements MetastoreClient 
{
             HiveConf conf = hiveConf.conf();
             try {
                 return new HiveMetastoreClient(
-                        identifier, schema, new CachedClientPool(conf, 
options, clientClassName));
+                        identifier, new CachedClientPool(conf, options, 
clientClassName));
             } catch (TException e) {
                 throw new RuntimeException(
                         "Can not get table " + identifier + " info from 
metastore.", e);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 840f1341a6..c385f243ae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -78,7 +78,7 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
           // sync to metastore with delete partitions
           if (clientFactory != null && 
fileStoreTable.coreOptions().partitionedTableInMetastore()) {
             metastoreClient = clientFactory.create()
-            toPaimonPartitions(rows).foreach(metastoreClient.deletePartition)
+            
metastoreClient.dropPartitions(toPaimonPartitions(rows).toSeq.asJava)
           }
         } finally {
           commit.close()

Reply via email to