This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 03abd9441 [core] Add batch add partitions in MetastoreClient (#4480)
03abd9441 is described below
commit 03abd9441acac41d64d0ed7835cacc0122767342
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 8 15:35:18 2024 +0800
[core] Add batch add partitions in MetastoreClient (#4480)
---
.../metastore/AddPartitionCommitCallback.java | 40 +++++++++++++----
.../apache/paimon/metastore/MetastoreClient.java | 14 ++++++
.../apache/paimon/hive/HiveMetastoreClient.java | 52 +++++++++++++++-------
.../apache/paimon/spark/PaimonSparkTestBase.scala | 24 ++++------
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 8 ++--
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 45 +++++++++++++++++++
6 files changed, 140 insertions(+), 43 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index 4f7d3d554..06002161a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -30,7 +30,10 @@ import
org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/** A {@link CommitCallback} to add newly created partitions to metastore. */
public class AddPartitionCommitCallback implements CommitCallback {
@@ -52,19 +55,21 @@ public class AddPartitionCommitCallback implements
CommitCallback {
@Override
public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
- committedEntries.stream()
- .filter(e -> FileKind.ADD.equals(e.kind()))
- .map(ManifestEntry::partition)
- .distinct()
- .forEach(this::addPartition);
+ Set<BinaryRow> partitions =
+ committedEntries.stream()
+ .filter(e -> FileKind.ADD.equals(e.kind()))
+ .map(ManifestEntry::partition)
+ .collect(Collectors.toSet());
+ addPartitions(partitions);
}
@Override
public void retry(ManifestCommittable committable) {
- committable.fileCommittables().stream()
- .map(CommitMessage::partition)
- .distinct()
- .forEach(this::addPartition);
+ Set<BinaryRow> partitions =
+ committable.fileCommittables().stream()
+ .map(CommitMessage::partition)
+ .collect(Collectors.toSet());
+ addPartitions(partitions);
}
private void addPartition(BinaryRow partition) {
@@ -81,6 +86,23 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
}
+ private void addPartitions(Set<BinaryRow> partitions) {
+ try {
+ List<BinaryRow> newPartitions = new ArrayList<>();
+ for (BinaryRow partition : partitions) {
+ if (!cache.get(partition, () -> false)) {
+ newPartitions.add(partition);
+ }
+ }
+ if (!newPartitions.isEmpty()) {
+ client.addPartitions(newPartitions);
+ newPartitions.forEach(partition -> cache.put(partition, true));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public void close() throws Exception {
client.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index ac12bfc73..60e28c59f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import java.io.Serializable;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -32,8 +33,21 @@ public interface MetastoreClient extends AutoCloseable {
void addPartition(BinaryRow partition) throws Exception;
+ default void addPartitions(List<BinaryRow> partitions) throws Exception {
+ for (BinaryRow partition : partitions) {
+ addPartition(partition);
+ }
+ }
+
void addPartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+ default void addPartitionsSpec(List<LinkedHashMap<String, String>>
partitionSpecsList)
+ throws Exception {
+ for (LinkedHashMap<String, String> partitionSpecs :
partitionSpecsList) {
+ addPartition(partitionSpecs);
+ }
+ }
+
void deletePartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
void markDone(LinkedHashMap<String, String> partitionSpec) throws
Exception;
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 5856515bb..cb70e0191 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/** {@link MetastoreClient} for Hive tables. */
public class HiveMetastoreClient implements MetastoreClient {
@@ -81,6 +82,14 @@ public class HiveMetastoreClient implements MetastoreClient {
addPartition(partitionComputer.generatePartValues(partition));
}
+ @Override
+ public void addPartitions(List<BinaryRow> partitions) throws Exception {
+ addPartitionsSpec(
+ partitions.stream()
+ .map(partitionComputer::generatePartValues)
+ .collect(Collectors.toList()));
+ }
+
@Override
public void addPartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
@@ -94,25 +103,23 @@ public class HiveMetastoreClient implements
MetastoreClient {
// do nothing if the partition already exists
} catch (NoSuchObjectException e) {
// partition not found, create new partition
- StorageDescriptor newSd = new StorageDescriptor(sd);
- newSd.setLocation(
- sd.getLocation()
- + "/"
- +
PartitionPathUtils.generatePartitionPath(partitionSpec));
-
- Partition hivePartition = new Partition();
- hivePartition.setDbName(identifier.getDatabaseName());
- hivePartition.setTableName(identifier.getTableName());
- hivePartition.setValues(partitionValues);
- hivePartition.setSd(newSd);
- int currentTime = (int) (System.currentTimeMillis() / 1000);
- hivePartition.setCreateTime(currentTime);
- hivePartition.setLastAccessTime(currentTime);
-
+ Partition hivePartition =
+ toHivePartition(partitionSpec, (int)
(System.currentTimeMillis() / 1000));
clients.execute(client -> client.add_partition(hivePartition));
}
}
+ @Override
+ public void addPartitionsSpec(List<LinkedHashMap<String, String>>
partitionSpecsList)
+ throws Exception {
+ int currentTime = (int) (System.currentTimeMillis() / 1000);
+ List<Partition> hivePartitions =
+ partitionSpecsList.stream()
+ .map(partitionSpec -> toHivePartition(partitionSpec,
currentTime))
+ .collect(Collectors.toList());
+ clients.execute(client -> client.add_partitions(hivePartitions, true,
false));
+ }
+
@Override
public void alterPartition(
LinkedHashMap<String, String> partitionSpec,
@@ -179,6 +186,21 @@ public class HiveMetastoreClient implements
MetastoreClient {
return clients.run(client -> client);
}
+ private Partition toHivePartition(
+ LinkedHashMap<String, String> partitionSpec, int currentTime) {
+ Partition hivePartition = new Partition();
+ StorageDescriptor newSd = new StorageDescriptor(sd);
+ newSd.setLocation(
+ sd.getLocation() + "/" +
PartitionPathUtils.generatePartitionPath(partitionSpec));
+ hivePartition.setDbName(identifier.getDatabaseName());
+ hivePartition.setTableName(identifier.getTableName());
+ hivePartition.setValues(new ArrayList<>(partitionSpec.values()));
+ hivePartition.setSd(newSd);
+ hivePartition.setCreateTime(currentTime);
+ hivePartition.setLastAccessTime(currentTime);
+ return hivePartition;
+ }
+
/** Factory to create {@link HiveMetastoreClient}. */
public static class Factory implements MetastoreClient.Factory {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 3deb91cbc..9b4a34425 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -18,9 +18,8 @@
package org.apache.paimon.spark
-import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
-import org.apache.paimon.options.{CatalogOptions, Options}
-import org.apache.paimon.spark.catalog.Catalogs
+import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.spark.catalog.WithPaimonCatalog
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
import org.apache.paimon.table.FileStoreTable
@@ -36,7 +35,6 @@ import org.scalactic.source.Position
import org.scalatest.Tag
import java.io.File
-import java.util.{HashMap => JHashMap}
import java.util.TimeZone
import scala.util.Random
@@ -49,7 +47,9 @@ class PaimonSparkTestBase
protected lazy val tempDBDir: File = Utils.createTempDir
- protected lazy val catalog: Catalog = initCatalog()
+ protected def paimonCatalog: Catalog = {
+
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[WithPaimonCatalog].paimonCatalog()
+ }
protected val dbName0: String = "test"
@@ -122,18 +122,12 @@ class PaimonSparkTestBase
super.test(testName, testTags: _*)(testFun)(pos)
}
- private def initCatalog(): Catalog = {
- val currentCatalog =
spark.sessionState.catalogManager.currentCatalog.name()
- val options =
- new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog,
spark.sessionState.conf))
- options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
- val catalogContext =
- CatalogContext.create(Options.fromMap(options),
spark.sessionState.newHadoopConf())
- CatalogFactory.createCatalog(catalogContext)
+ def loadTable(tableName: String): FileStoreTable = {
+ loadTable(dbName0, tableName)
}
- def loadTable(tableName: String): FileStoreTable = {
- catalog.getTable(Identifier.create(dbName0,
tableName)).asInstanceOf[FileStoreTable]
+ def loadTable(dbName: String, tableName: String): FileStoreTable = {
+ paimonCatalog.getTable(Identifier.create(dbName,
tableName)).asInstanceOf[FileStoreTable]
}
protected def createRelationV2(tableName: String): LogicalPlan = {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 0006e45ec..cf1a71d51 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -352,7 +352,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
.column("ts", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.column("ts_ntz", DataTypes.TIMESTAMP())
.build
- catalog.createTable(identifier, schema, false)
+ paimonCatalog.createTable(identifier, schema, false)
sql(
s"INSERT INTO paimon_tbl VALUES (timestamp'2024-01-01 00:00:00',
timestamp_ntz'2024-01-01 00:00:00')")
@@ -370,7 +370,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
// Due to previous design, read timestamp ltz type with spark 3.3 and
below will cause problems,
// skip testing it
if (gteqSpark3_4) {
- val table = catalog.getTable(identifier)
+ val table = paimonCatalog.getTable(identifier)
val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
val splits = builder.newScan().plan().splits()
builder.newRead
@@ -405,7 +405,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
// Due to previous design, read timestamp ltz type with spark 3.3
and below will cause problems,
// skip testing it
if (gteqSpark3_4) {
- val table = catalog.getTable(identifier)
+ val table = paimonCatalog.getTable(identifier)
val builder = table.newReadBuilder.withProjection(Array[Int](0, 1))
val splits = builder.newScan().plan().splits()
builder.newRead
@@ -423,7 +423,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
} finally {
- catalog.dropTable(identifier, true)
+ paimonCatalog.dropTable(identifier, true)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index fc1d855ec..7478f9628 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.hive.HiveMetastoreClient
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FileStoreTable
@@ -251,6 +252,50 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
}
}
+ test("Paimon DDL with hive catalog: sync partitions to HMS") {
+ Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+ catalogName =>
+ val dbName = "default"
+ val tblName = "t"
+ spark.sql(s"USE $catalogName.$dbName")
+ withTable(tblName) {
+ spark.sql(s"""
+ |CREATE TABLE $tblName (id INT, pt INT)
+ |USING PAIMON
+ |TBLPROPERTIES ('metastore.partitioned-table' = 'true')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ val metastoreClient = loadTable(dbName, tblName)
+ .catalogEnvironment()
+ .metastoreClientFactory()
+ .create()
+ .asInstanceOf[HiveMetastoreClient]
+ .client()
+
+ spark.sql(s"INSERT INTO $tblName VALUES (1, 1), (2, 2), (3, 3)")
+ // check partitions in paimon
+ checkAnswer(
+ spark.sql(s"show partitions $tblName"),
+ Seq(Row("pt=1"), Row("pt=2"), Row("pt=3")))
+ // check partitions in HMS
+ assert(metastoreClient.listPartitions(dbName, tblName, 100).size()
== 3)
+
+ spark.sql(s"INSERT INTO $tblName VALUES (4, 3), (5, 4)")
+ checkAnswer(
+ spark.sql(s"show partitions $tblName"),
+ Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4")))
+ assert(metastoreClient.listPartitions(dbName, tblName, 100).size()
== 4)
+
+ spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)")
+ checkAnswer(
+ spark.sql(s"show partitions $tblName"),
+ Seq(Row("pt=2"), Row("pt=3"), Row("pt=4")))
+ assert(metastoreClient.listPartitions(dbName, tblName, 100).size()
== 3)
+ }
+ }
+ }
+
def getDatabaseLocation(dbName: String): String = {
spark
.sql(s"DESC DATABASE $dbName")