This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 03abd9441 [core] Add batch add partitions in MetastoreClient (#4480)
03abd9441 is described below

commit 03abd9441acac41d64d0ed7835cacc0122767342
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 8 15:35:18 2024 +0800

    [core] Add batch add partitions in MetastoreClient (#4480)
---
 .../metastore/AddPartitionCommitCallback.java      | 40 +++++++++++++----
 .../apache/paimon/metastore/MetastoreClient.java   | 14 ++++++
 .../apache/paimon/hive/HiveMetastoreClient.java    | 52 +++++++++++++++-------
 .../apache/paimon/spark/PaimonSparkTestBase.scala  | 24 ++++------
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  |  8 ++--
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 45 +++++++++++++++++++
 6 files changed, 140 insertions(+), 43 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 4f7d3d554..06002161a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -30,7 +30,10 @@ import 
org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
 import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** A {@link CommitCallback} to add newly created partitions to metastore. */
 public class AddPartitionCommitCallback implements CommitCallback {
@@ -52,19 +55,21 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
 
     @Override
     public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
-        committedEntries.stream()
-                .filter(e -> FileKind.ADD.equals(e.kind()))
-                .map(ManifestEntry::partition)
-                .distinct()
-                .forEach(this::addPartition);
+        Set<BinaryRow> partitions =
+                committedEntries.stream()
+                        .filter(e -> FileKind.ADD.equals(e.kind()))
+                        .map(ManifestEntry::partition)
+                        .collect(Collectors.toSet());
+        addPartitions(partitions);
     }
 
     @Override
     public void retry(ManifestCommittable committable) {
-        committable.fileCommittables().stream()
-                .map(CommitMessage::partition)
-                .distinct()
-                .forEach(this::addPartition);
+        Set<BinaryRow> partitions =
+                committable.fileCommittables().stream()
+                        .map(CommitMessage::partition)
+                        .collect(Collectors.toSet());
+        addPartitions(partitions);
     }
 
     private void addPartition(BinaryRow partition) {
@@ -81,6 +86,23 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
         }
     }
 
+    private void addPartitions(Set<BinaryRow> partitions) {
+        try {
+            List<BinaryRow> newPartitions = new ArrayList<>();
+            for (BinaryRow partition : partitions) {
+                if (!cache.get(partition, () -> false)) {
+                    newPartitions.add(partition);
+                }
+            }
+            if (!newPartitions.isEmpty()) {
+                client.addPartitions(newPartitions);
+                newPartitions.forEach(partition -> cache.put(partition, true));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void close() throws Exception {
         client.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index ac12bfc73..60e28c59f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -32,8 +33,21 @@ public interface MetastoreClient extends AutoCloseable {
 
     void addPartition(BinaryRow partition) throws Exception;
 
+    default void addPartitions(List<BinaryRow> partitions) throws Exception {
+        for (BinaryRow partition : partitions) {
+            addPartition(partition);
+        }
+    }
+
     void addPartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
 
+    default void addPartitionsSpec(List<LinkedHashMap<String, String>> 
partitionSpecsList)
+            throws Exception {
+        for (LinkedHashMap<String, String> partitionSpecs : 
partitionSpecsList) {
+            addPartition(partitionSpecs);
+        }
+    }
+
     void deletePartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
 
     void markDone(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 5856515bb..cb70e0191 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** {@link MetastoreClient} for Hive tables. */
 public class HiveMetastoreClient implements MetastoreClient {
@@ -81,6 +82,14 @@ public class HiveMetastoreClient implements MetastoreClient {
         addPartition(partitionComputer.generatePartValues(partition));
     }
 
+    @Override
+    public void addPartitions(List<BinaryRow> partitions) throws Exception {
+        addPartitionsSpec(
+                partitions.stream()
+                        .map(partitionComputer::generatePartValues)
+                        .collect(Collectors.toList()));
+    }
+
     @Override
     public void addPartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
         List<String> partitionValues = new ArrayList<>(partitionSpec.values());
@@ -94,25 +103,23 @@ public class HiveMetastoreClient implements 
MetastoreClient {
             // do nothing if the partition already exists
         } catch (NoSuchObjectException e) {
             // partition not found, create new partition
-            StorageDescriptor newSd = new StorageDescriptor(sd);
-            newSd.setLocation(
-                    sd.getLocation()
-                            + "/"
-                            + 
PartitionPathUtils.generatePartitionPath(partitionSpec));
-
-            Partition hivePartition = new Partition();
-            hivePartition.setDbName(identifier.getDatabaseName());
-            hivePartition.setTableName(identifier.getTableName());
-            hivePartition.setValues(partitionValues);
-            hivePartition.setSd(newSd);
-            int currentTime = (int) (System.currentTimeMillis() / 1000);
-            hivePartition.setCreateTime(currentTime);
-            hivePartition.setLastAccessTime(currentTime);
-
+            Partition hivePartition =
+                    toHivePartition(partitionSpec, (int) 
(System.currentTimeMillis() / 1000));
             clients.execute(client -> client.add_partition(hivePartition));
         }
     }
 
+    @Override
+    public void addPartitionsSpec(List<LinkedHashMap<String, String>> 
partitionSpecsList)
+            throws Exception {
+        int currentTime = (int) (System.currentTimeMillis() / 1000);
+        List<Partition> hivePartitions =
+                partitionSpecsList.stream()
+                        .map(partitionSpec -> toHivePartition(partitionSpec, 
currentTime))
+                        .collect(Collectors.toList());
+        clients.execute(client -> client.add_partitions(hivePartitions, true, 
false));
+    }
+
     @Override
     public void alterPartition(
             LinkedHashMap<String, String> partitionSpec,
@@ -179,6 +186,21 @@ public class HiveMetastoreClient implements 
MetastoreClient {
         return clients.run(client -> client);
     }
 
+    private Partition toHivePartition(
+            LinkedHashMap<String, String> partitionSpec, int currentTime) {
+        Partition hivePartition = new Partition();
+        StorageDescriptor newSd = new StorageDescriptor(sd);
+        newSd.setLocation(
+                sd.getLocation() + "/" + 
PartitionPathUtils.generatePartitionPath(partitionSpec));
+        hivePartition.setDbName(identifier.getDatabaseName());
+        hivePartition.setTableName(identifier.getTableName());
+        hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
+        hivePartition.setSd(newSd);
+        hivePartition.setCreateTime(currentTime);
+        hivePartition.setLastAccessTime(currentTime);
+        return hivePartition;
+    }
+
     /** Factory to create {@link HiveMetastoreClient}. */
     public static class Factory implements MetastoreClient.Factory {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 3deb91cbc..9b4a34425 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -18,9 +18,8 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, 
Identifier}
-import org.apache.paimon.options.{CatalogOptions, Options}
-import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.spark.catalog.WithPaimonCatalog
 import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
 import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
 import org.apache.paimon.table.FileStoreTable
@@ -36,7 +35,6 @@ import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import java.io.File
-import java.util.{HashMap => JHashMap}
 import java.util.TimeZone
 
 import scala.util.Random
@@ -49,7 +47,9 @@ class PaimonSparkTestBase
 
   protected lazy val tempDBDir: File = Utils.createTempDir
 
-  protected lazy val catalog: Catalog = initCatalog()
+  protected def paimonCatalog: Catalog = {
+    
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[WithPaimonCatalog].paimonCatalog()
+  }
 
   protected val dbName0: String = "test"
 
@@ -122,18 +122,12 @@ class PaimonSparkTestBase
     super.test(testName, testTags: _*)(testFun)(pos)
   }
 
-  private def initCatalog(): Catalog = {
-    val currentCatalog = 
spark.sessionState.catalogManager.currentCatalog.name()
-    val options =
-      new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog, 
spark.sessionState.conf))
-    options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
-    val catalogContext =
-      CatalogContext.create(Options.fromMap(options), 
spark.sessionState.newHadoopConf())
-    CatalogFactory.createCatalog(catalogContext)
+  def loadTable(tableName: String): FileStoreTable = {
+    loadTable(dbName0, tableName)
   }
 
-  def loadTable(tableName: String): FileStoreTable = {
-    catalog.getTable(Identifier.create(dbName0, 
tableName)).asInstanceOf[FileStoreTable]
+  def loadTable(dbName: String, tableName: String): FileStoreTable = {
+    paimonCatalog.getTable(Identifier.create(dbName, 
tableName)).asInstanceOf[FileStoreTable]
   }
 
   protected def createRelationV2(tableName: String): LogicalPlan = {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 0006e45ec..cf1a71d51 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -352,7 +352,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
           .column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
           .column("ts_ntz", DataTypes.TIMESTAMP())
           .build
-        catalog.createTable(identifier, schema, false)
+        paimonCatalog.createTable(identifier, schema, false)
         sql(
           s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00', 
timestamp_ntz'2024-01-01 00:00:00')")
 
@@ -370,7 +370,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
         // Due to previous design, read timestamp ltz type with spark 3.3 and 
below will cause problems,
         // skip testing it
         if (gteqSpark3_4) {
-          val table = catalog.getTable(identifier)
+          val table = paimonCatalog.getTable(identifier)
           val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
           val splits = builder.newScan().plan().splits()
           builder.newRead
@@ -405,7 +405,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
           // Due to previous design, read timestamp ltz type with spark 3.3 
and below will cause problems,
           // skip testing it
           if (gteqSpark3_4) {
-            val table = catalog.getTable(identifier)
+            val table = paimonCatalog.getTable(identifier)
             val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
             val splits = builder.newScan().plan().splits()
             builder.newRead
@@ -423,7 +423,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
         }
       }
     } finally {
-      catalog.dropTable(identifier, true)
+      paimonCatalog.dropTable(identifier, true)
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index fc1d855ec..7478f9628 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.hive.HiveMetastoreClient
 import org.apache.paimon.spark.PaimonHiveTestBase
 import org.apache.paimon.table.FileStoreTable
 
@@ -251,6 +252,50 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Paimon DDL with hive catalog: sync partitions to HMS") {
+    Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        val dbName = "default"
+        val tblName = "t"
+        spark.sql(s"USE $catalogName.$dbName")
+        withTable(tblName) {
+          spark.sql(s"""
+                       |CREATE TABLE $tblName (id INT, pt INT)
+                       |USING PAIMON
+                       |TBLPROPERTIES ('metastore.partitioned-table' = 'true')
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+
+          val metastoreClient = loadTable(dbName, tblName)
+            .catalogEnvironment()
+            .metastoreClientFactory()
+            .create()
+            .asInstanceOf[HiveMetastoreClient]
+            .client()
+
+          spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)")
+          // check partitions in paimon
+          checkAnswer(
+            spark.sql(s"show partitions $tblName"),
+            Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
+          // check partitions in HMS
+          assert(metastoreClient.listPartitions(dbName, tblName, 100).size() 
== 3)
+
+          spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)")
+          checkAnswer(
+            spark.sql(s"show partitions $tblName"),
+            Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4")))
+          assert(metastoreClient.listPartitions(dbName, tblName, 100).size() 
== 4)
+
+          spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)")
+          checkAnswer(
+            spark.sql(s"show partitions $tblName"),
+            Seq(Row("pt=2"), Row("pt=3"), Row("pt=4")))
+          assert(metastoreClient.listPartitions(dbName, tblName, 100).size() 
== 3)
+        }
+    }
+  }
+
   def getDatabaseLocation(dbName: String): String = {
     spark
       .sql(s"DESC DATABASE $dbName")

Reply via email to