This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 57d1ac55f4 [core] format table: insert data after commit sync
partitions to hms (#6584)
57d1ac55f4 is described below
commit 57d1ac55f472f22941da6bdbb50b6e3d0e2b9f77
Author: jerry <[email protected]>
AuthorDate: Mon Nov 17 17:47:50 2025 +0800
[core] format table: insert data after commit sync partitions to hms (#6584)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 9 ++++
.../org/apache/paimon/catalog/CatalogUtils.java | 8 ++-
.../java/org/apache/paimon/table/FormatTable.java | 32 ++++++++++--
.../table/format/FormatBatchWriteBuilder.java | 11 +++--
.../paimon/table/format/FormatTableCommit.java | 57 +++++++++++++++++++++-
.../java/org/apache/paimon/hive/HiveCatalog.java | 19 +++++---
.../apache/paimon/spark/PaimonHiveTestBase.scala | 6 +--
.../paimon/spark/sql/FormatTableTestBase.scala | 26 +++++++++-
9 files changed, 153 insertions(+), 21 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 889adf5a86..0698b72f0f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -548,6 +548,12 @@ under the License.
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
+ <tr>
+ <td><h5>format-table.commit-hive-sync-url</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Format table commit hive sync uri.</td>
+ </tr>
<tr>
<td><h5>format-table.file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 590b7b6dd8..464082cf6e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2030,6 +2030,11 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withFallbackKeys(FILE_COMPRESSION.key())
.withDescription("Format table file compression.");
+ public static final ConfigOption<String> FORMAT_TABLE_COMMIT_HIVE_SYNC_URI
=
+ ConfigOptions.key("format-table.commit-hive-sync-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Format table commit hive sync uri.");
public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
@@ -2326,6 +2331,10 @@ public class CoreOptions implements Serializable {
}
}
+ public String formatTableCommitSyncPartitionHiveUri() {
+ return options.get(FORMAT_TABLE_COMMIT_HIVE_SYNC_URI);
+ }
+
public MemorySize fileReaderAsyncThreshold() {
return options.get(FILE_READER_ASYNC_THRESHOLD);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 3517c18f3c..138769f0c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -239,7 +239,7 @@ public class CatalogUtils {
Function<Path, FileIO> dataFileIO = metadata.isExternal() ?
externalFileIO : internalFileIO;
if (options.type() == TableType.FORMAT_TABLE) {
- return toFormatTable(identifier, schema, dataFileIO);
+ return toFormatTable(identifier, schema, dataFileIO,
catalogContext);
}
if (options.type() == TableType.OBJECT_TABLE) {
@@ -379,7 +379,10 @@ public class CatalogUtils {
}
private static FormatTable toFormatTable(
- Identifier identifier, TableSchema schema, Function<Path, FileIO>
fileIO) {
+ Identifier identifier,
+ TableSchema schema,
+ Function<Path, FileIO> fileIO,
+ CatalogContext catalogContext) {
Map<String, String> options = schema.options();
FormatTable.Format format =
FormatTable.parseFormat(
@@ -396,6 +399,7 @@ public class CatalogUtils {
.format(format)
.options(options)
.comment(schema.comment())
+ .catalogContext(catalogContext)
.build();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 2963dbb04d..1e481fee45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -68,6 +69,8 @@ public interface FormatTable extends Table {
@Override
FormatTable copy(Map<String, String> dynamicOptions);
+ CatalogContext catalogContext();
+
/** Currently supported formats. */
enum Format {
ORC,
@@ -105,6 +108,7 @@ public interface FormatTable extends Table {
private Format format;
private Map<String, String> options;
@Nullable private String comment;
+ private CatalogContext catalogContext;
public Builder fileIO(FileIO fileIO) {
this.fileIO = fileIO;
@@ -146,9 +150,22 @@ public interface FormatTable extends Table {
return this;
}
+ public Builder catalogContext(CatalogContext catalogContext) {
+ this.catalogContext = catalogContext;
+ return this;
+ }
+
public FormatTable build() {
return new FormatTableImpl(
- fileIO, identifier, rowType, partitionKeys, location,
format, options, comment);
+ fileIO,
+ identifier,
+ rowType,
+ partitionKeys,
+ location,
+ format,
+ options,
+ comment,
+ catalogContext);
}
}
@@ -165,6 +182,7 @@ public interface FormatTable extends Table {
private final Format format;
private final Map<String, String> options;
@Nullable private final String comment;
+ private CatalogContext catalogContext;
public FormatTableImpl(
FileIO fileIO,
@@ -174,7 +192,8 @@ public interface FormatTable extends Table {
String location,
Format format,
Map<String, String> options,
- @Nullable String comment) {
+ @Nullable String comment,
+ CatalogContext catalogContext) {
this.fileIO = fileIO;
this.identifier = identifier;
this.rowType = rowType;
@@ -183,6 +202,7 @@ public interface FormatTable extends Table {
this.format = format;
this.options = options;
this.comment = comment;
+ this.catalogContext = catalogContext;
}
@Override
@@ -247,7 +267,13 @@ public interface FormatTable extends Table {
location,
format,
newOptions,
- comment);
+ comment,
+ catalogContext);
+ }
+
+ @Override
+ public CatalogContext catalogContext() {
+ return this.catalogContext;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
index 78fef13ab6..924e67dbdf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.format;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
@@ -74,15 +75,19 @@ public class FormatBatchWriteBuilder implements
BatchWriteBuilder {
@Override
public BatchTableCommit newCommit() {
- boolean formatTablePartitionOnlyValueInPath =
- (new
CoreOptions(table.options())).formatTablePartitionOnlyValueInPath();
+ CoreOptions options = new CoreOptions(table.options());
+ boolean formatTablePartitionOnlyValueInPath =
options.formatTablePartitionOnlyValueInPath();
+ String syncHiveUri = options.formatTableCommitSyncPartitionHiveUri();
return new FormatTableCommit(
table.location(),
table.partitionKeys(),
table.fileIO(),
formatTablePartitionOnlyValueInPath,
overwrite,
- staticPartition);
+ Identifier.fromString(table.fullName()),
+ staticPartition,
+ syncHiveUri,
+ table.catalogContext());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index 5ff58d8b9a..80182f8246 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -18,22 +18,31 @@
package org.apache.paimon.table.format;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.utils.PartitionPathUtils;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,6 +59,8 @@ public class FormatTableCommit implements BatchTableCommit {
private List<String> partitionKeys;
protected Map<String, String> staticPartitions;
protected boolean overwrite = false;
+ private Catalog hiveCatalog;
+ private Identifier tableIdentifier;
public FormatTableCommit(
String location,
@@ -57,7 +68,10 @@ public class FormatTableCommit implements BatchTableCommit {
FileIO fileIO,
boolean formatTablePartitionOnlyValueInPath,
boolean overwrite,
- @Nullable Map<String, String> staticPartitions) {
+ Identifier tableIdentifier,
+ @Nullable Map<String, String> staticPartitions,
+ @Nullable String syncHiveUri,
+ CatalogContext catalogContext) {
this.location = location;
this.fileIO = fileIO;
this.formatTablePartitionOnlyValueInPath =
formatTablePartitionOnlyValueInPath;
@@ -65,6 +79,22 @@ public class FormatTableCommit implements BatchTableCommit {
this.staticPartitions = staticPartitions;
this.overwrite = overwrite;
this.partitionKeys = partitionKeys;
+ this.tableIdentifier = tableIdentifier;
+ if (syncHiveUri != null) {
+ try {
+ Options options = new Options();
+ options.set(CatalogOptions.URI, syncHiveUri);
+ options.set(CatalogOptions.METASTORE, "hive");
+ CatalogContext context =
+ CatalogContext.create(options,
catalogContext.hadoopConf());
+ this.hiveCatalog = CatalogFactory.createCatalog(context);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to initialize Hive catalog with URI:
%s", syncHiveUri),
+ e);
+ }
+ }
}
@Override
@@ -81,6 +111,8 @@ public class FormatTableCommit implements BatchTableCommit {
}
}
+ Set<Map<String, String>> partitionSpecs = new HashSet<>();
+
if (staticPartitions != null && !staticPartitions.isEmpty()) {
Path partitionPath =
buildPartitionPath(
@@ -88,7 +120,7 @@ public class FormatTableCommit implements BatchTableCommit {
staticPartitions,
formatTablePartitionOnlyValueInPath,
partitionKeys);
-
+ partitionSpecs.add(staticPartitions);
if (overwrite) {
deletePreviousDataFile(partitionPath);
}
@@ -107,10 +139,21 @@ public class FormatTableCommit implements
BatchTableCommit {
for (TwoPhaseOutputStream.Committer committer : committers) {
committer.commit(this.fileIO);
+ if (partitionKeys != null && !partitionKeys.isEmpty() &&
hiveCatalog != null) {
+ partitionSpecs.add(
+ extractPartitionSpecFromPath(
+ committer.targetPath().getParent(),
partitionKeys));
+ }
}
for (TwoPhaseOutputStream.Committer committer : committers) {
committer.clean(this.fileIO);
}
+ for (Map<String, String> partitionSpec : partitionSpecs) {
+ if (hiveCatalog != null) {
+ hiveCatalog.createPartitions(
+ tableIdentifier,
Collections.singletonList(partitionSpec));
+ }
+ }
} catch (Exception e) {
this.abort(commitMessages);
@@ -118,6 +161,16 @@ public class FormatTableCommit implements BatchTableCommit
{
}
}
+ private LinkedHashMap<String, String> extractPartitionSpecFromPath(
+ Path partitionPath, List<String> partitionKeys) {
+ if (formatTablePartitionOnlyValueInPath) {
+ return PartitionPathUtils.extractPartitionSpecFromPathOnlyValue(
+ partitionPath, partitionKeys);
+ } else {
+ return
PartitionPathUtils.extractPartitionSpecFromPath(partitionPath);
+ }
+ }
+
private static Path buildPartitionPath(
String location,
Map<String, String> partitionSpec,
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index bc99c35a4a..f225d5baf7 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -516,13 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
String tagToPartitionField = table.coreOptions().tagToPartitionField();
if (tagToPartitionField != null) {
try {
- List<Partition> partitions =
- clients.run(
- client ->
- client.listPartitions(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- Short.MAX_VALUE));
+ List<Partition> partitions = listPartitionsFromHms(identifier);
return partitions.stream()
.map(
part -> {
@@ -558,6 +552,17 @@ public class HiveCatalog extends AbstractCatalog {
return listPartitionsFromFileSystem(table);
}
+ @VisibleForTesting
+ public List<Partition> listPartitionsFromHms(Identifier identifier)
+ throws TException, InterruptedException {
+ return clients.run(
+ client ->
+ client.listPartitions(
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ Short.MAX_VALUE));
+ }
+
private List<Map<String, String>> removePartitionsExistsInOtherBranches(
Identifier identifier, List<Map<String, String>> inputs) throws
TableNotExistException {
FileStoreTable mainTable =
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index ee423a0a59..3845a6b8b9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -31,8 +31,6 @@ import java.io.File
class PaimonHiveTestBase extends PaimonSparkTestBase {
- import PaimonHiveTestBase._
-
protected lazy val tempHiveDBDir: File = Utils.createTempDir
protected lazy val testHiveMetastore: TestHiveMetastore = new
TestHiveMetastore
@@ -43,6 +41,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
protected val hiveDbName: String = "test_hive"
+ val hiveUri: String = PaimonHiveTestBase.hiveUri
+
/**
* Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive
([[SparkCatalog]] in hive)
* catalog
@@ -61,7 +61,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
}
override protected def beforeAll(): Unit = {
- testHiveMetastore.start(hivePort)
+ testHiveMetastore.start(PaimonHiveTestBase.hivePort)
super.beforeAll()
spark.sql(s"USE $sparkCatalogName")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 4a895b10b6..573f3c8899 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -18,8 +18,9 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.catalog.{DelegateCatalog, Identifier}
import org.apache.paimon.fs.Path
+import org.apache.paimon.hive.HiveCatalog
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FormatTable
import org.apache.paimon.utils.CompressUtils
@@ -45,6 +46,29 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
}
+ test("Format table: check partition sync") {
+ val tableName = "t"
+ withTable(tableName) {
+ val hiveCatalog =
+
paimonCatalog.asInstanceOf[DelegateCatalog].wrapped().asInstanceOf[HiveCatalog]
+ sql(
+ s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint) TBLPROPERTIES ('metastore.partitioned-table'='true')")
+ sql(s"INSERT INTO $tableName VALUES (1, 2023)")
+ var ds = 2023L
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
+ var partitions =
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
+ assert(partitions.size == 0)
+ sql(s"DROP TABLE $tableName")
+ sql(
+ s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds`
bigint) TBLPROPERTIES ('format-table.commit-hive-sync-url'='$hiveUri',
'metastore.partitioned-table'='true')")
+ ds = 2024L
+ sql(s"INSERT INTO $tableName VALUES (1, $ds)")
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
+ partitions =
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
+ assert(partitions.get(0).getValues.get(0).equals(ds.toString))
+ }
+ }
+
test("Format table: write partitioned table") {
for (
(format, compression) <- Seq(