This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 57d1ac55f4 [core] format table: insert data after commit sync 
partitions to hms (#6584)
57d1ac55f4 is described below

commit 57d1ac55f472f22941da6bdbb50b6e3d0e2b9f77
Author: jerry <[email protected]>
AuthorDate: Mon Nov 17 17:47:50 2025 +0800

    [core] format table: insert data after commit sync partitions to hms (#6584)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  9 ++++
 .../org/apache/paimon/catalog/CatalogUtils.java    |  8 ++-
 .../java/org/apache/paimon/table/FormatTable.java  | 32 ++++++++++--
 .../table/format/FormatBatchWriteBuilder.java      | 11 +++--
 .../paimon/table/format/FormatTableCommit.java     | 57 +++++++++++++++++++++-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 19 +++++---
 .../apache/paimon/spark/PaimonHiveTestBase.scala   |  6 +--
 .../paimon/spark/sql/FormatTableTestBase.scala     | 26 +++++++++-
 9 files changed, 153 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 889adf5a86..0698b72f0f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -548,6 +548,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to force the use of lookup for compaction.</td>
         </tr>
+        <tr>
+            <td><h5>format-table.commit-hive-sync-url</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Format table commit hive sync uri.</td>
+        </tr>
         <tr>
             <td><h5>format-table.file.compression</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 590b7b6dd8..464082cf6e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2030,6 +2030,11 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withFallbackKeys(FILE_COMPRESSION.key())
                     .withDescription("Format table file compression.");
+    public static final ConfigOption<String> FORMAT_TABLE_COMMIT_HIVE_SYNC_URI 
=
+            ConfigOptions.key("format-table.commit-hive-sync-url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Format table commit hive sync uri.");
 
     public static final ConfigOption<String> BLOB_FIELD =
             key("blob-field")
@@ -2326,6 +2331,10 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    public String formatTableCommitSyncPartitionHiveUri() {
+        return options.get(FORMAT_TABLE_COMMIT_HIVE_SYNC_URI);
+    }
+
     public MemorySize fileReaderAsyncThreshold() {
         return options.get(FILE_READER_ASYNC_THRESHOLD);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 3517c18f3c..138769f0c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -239,7 +239,7 @@ public class CatalogUtils {
         Function<Path, FileIO> dataFileIO = metadata.isExternal() ? 
externalFileIO : internalFileIO;
 
         if (options.type() == TableType.FORMAT_TABLE) {
-            return toFormatTable(identifier, schema, dataFileIO);
+            return toFormatTable(identifier, schema, dataFileIO, 
catalogContext);
         }
 
         if (options.type() == TableType.OBJECT_TABLE) {
@@ -379,7 +379,10 @@ public class CatalogUtils {
     }
 
     private static FormatTable toFormatTable(
-            Identifier identifier, TableSchema schema, Function<Path, FileIO> 
fileIO) {
+            Identifier identifier,
+            TableSchema schema,
+            Function<Path, FileIO> fileIO,
+            CatalogContext catalogContext) {
         Map<String, String> options = schema.options();
         FormatTable.Format format =
                 FormatTable.parseFormat(
@@ -396,6 +399,7 @@ public class CatalogUtils {
                 .format(format)
                 .options(options)
                 .comment(schema.comment())
+                .catalogContext(catalogContext)
                 .build();
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 2963dbb04d..1e481fee45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -68,6 +69,8 @@ public interface FormatTable extends Table {
     @Override
     FormatTable copy(Map<String, String> dynamicOptions);
 
+    CatalogContext catalogContext();
+
     /** Currently supported formats. */
     enum Format {
         ORC,
@@ -105,6 +108,7 @@ public interface FormatTable extends Table {
         private Format format;
         private Map<String, String> options;
         @Nullable private String comment;
+        private CatalogContext catalogContext;
 
         public Builder fileIO(FileIO fileIO) {
             this.fileIO = fileIO;
@@ -146,9 +150,22 @@ public interface FormatTable extends Table {
             return this;
         }
 
+        public Builder catalogContext(CatalogContext catalogContext) {
+            this.catalogContext = catalogContext;
+            return this;
+        }
+
         public FormatTable build() {
             return new FormatTableImpl(
-                    fileIO, identifier, rowType, partitionKeys, location, 
format, options, comment);
+                    fileIO,
+                    identifier,
+                    rowType,
+                    partitionKeys,
+                    location,
+                    format,
+                    options,
+                    comment,
+                    catalogContext);
         }
     }
 
@@ -165,6 +182,7 @@ public interface FormatTable extends Table {
         private final Format format;
         private final Map<String, String> options;
         @Nullable private final String comment;
+        private CatalogContext catalogContext;
 
         public FormatTableImpl(
                 FileIO fileIO,
@@ -174,7 +192,8 @@ public interface FormatTable extends Table {
                 String location,
                 Format format,
                 Map<String, String> options,
-                @Nullable String comment) {
+                @Nullable String comment,
+                CatalogContext catalogContext) {
             this.fileIO = fileIO;
             this.identifier = identifier;
             this.rowType = rowType;
@@ -183,6 +202,7 @@ public interface FormatTable extends Table {
             this.format = format;
             this.options = options;
             this.comment = comment;
+            this.catalogContext = catalogContext;
         }
 
         @Override
@@ -247,7 +267,13 @@ public interface FormatTable extends Table {
                     location,
                     format,
                     newOptions,
-                    comment);
+                    comment,
+                    catalogContext);
+        }
+
+        @Override
+        public CatalogContext catalogContext() {
+            return this.catalogContext;
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
index 78fef13ab6..924e67dbdf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.format;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
@@ -74,15 +75,19 @@ public class FormatBatchWriteBuilder implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableCommit newCommit() {
-        boolean formatTablePartitionOnlyValueInPath =
-                (new 
CoreOptions(table.options())).formatTablePartitionOnlyValueInPath();
+        CoreOptions options = new CoreOptions(table.options());
+        boolean formatTablePartitionOnlyValueInPath = 
options.formatTablePartitionOnlyValueInPath();
+        String syncHiveUri = options.formatTableCommitSyncPartitionHiveUri();
         return new FormatTableCommit(
                 table.location(),
                 table.partitionKeys(),
                 table.fileIO(),
                 formatTablePartitionOnlyValueInPath,
                 overwrite,
-                staticPartition);
+                Identifier.fromString(table.fullName()),
+                staticPartition,
+                syncHiveUri,
+                table.catalogContext());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index 5ff58d8b9a..80182f8246 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -18,22 +18,31 @@
 
 package org.apache.paimon.table.format;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.TableCommit;
+import org.apache.paimon.utils.PartitionPathUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,6 +59,8 @@ public class FormatTableCommit implements BatchTableCommit {
     private List<String> partitionKeys;
     protected Map<String, String> staticPartitions;
     protected boolean overwrite = false;
+    private Catalog hiveCatalog;
+    private Identifier tableIdentifier;
 
     public FormatTableCommit(
             String location,
@@ -57,7 +68,10 @@ public class FormatTableCommit implements BatchTableCommit {
             FileIO fileIO,
             boolean formatTablePartitionOnlyValueInPath,
             boolean overwrite,
-            @Nullable Map<String, String> staticPartitions) {
+            Identifier tableIdentifier,
+            @Nullable Map<String, String> staticPartitions,
+            @Nullable String syncHiveUri,
+            CatalogContext catalogContext) {
         this.location = location;
         this.fileIO = fileIO;
         this.formatTablePartitionOnlyValueInPath = 
formatTablePartitionOnlyValueInPath;
@@ -65,6 +79,22 @@ public class FormatTableCommit implements BatchTableCommit {
         this.staticPartitions = staticPartitions;
         this.overwrite = overwrite;
         this.partitionKeys = partitionKeys;
+        this.tableIdentifier = tableIdentifier;
+        if (syncHiveUri != null) {
+            try {
+                Options options = new Options();
+                options.set(CatalogOptions.URI, syncHiveUri);
+                options.set(CatalogOptions.METASTORE, "hive");
+                CatalogContext context =
+                        CatalogContext.create(options, 
catalogContext.hadoopConf());
+                this.hiveCatalog = CatalogFactory.createCatalog(context);
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        String.format(
+                                "Failed to initialize Hive catalog with URI: 
%s", syncHiveUri),
+                        e);
+            }
+        }
     }
 
     @Override
@@ -81,6 +111,8 @@ public class FormatTableCommit implements BatchTableCommit {
                 }
             }
 
+            Set<Map<String, String>> partitionSpecs = new HashSet<>();
+
             if (staticPartitions != null && !staticPartitions.isEmpty()) {
                 Path partitionPath =
                         buildPartitionPath(
@@ -88,7 +120,7 @@ public class FormatTableCommit implements BatchTableCommit {
                                 staticPartitions,
                                 formatTablePartitionOnlyValueInPath,
                                 partitionKeys);
-
+                partitionSpecs.add(staticPartitions);
                 if (overwrite) {
                     deletePreviousDataFile(partitionPath);
                 }
@@ -107,10 +139,21 @@ public class FormatTableCommit implements 
BatchTableCommit {
 
             for (TwoPhaseOutputStream.Committer committer : committers) {
                 committer.commit(this.fileIO);
+                if (partitionKeys != null && !partitionKeys.isEmpty() && 
hiveCatalog != null) {
+                    partitionSpecs.add(
+                            extractPartitionSpecFromPath(
+                                    committer.targetPath().getParent(), 
partitionKeys));
+                }
             }
             for (TwoPhaseOutputStream.Committer committer : committers) {
                 committer.clean(this.fileIO);
             }
+            for (Map<String, String> partitionSpec : partitionSpecs) {
+                if (hiveCatalog != null) {
+                    hiveCatalog.createPartitions(
+                            tableIdentifier, 
Collections.singletonList(partitionSpec));
+                }
+            }
 
         } catch (Exception e) {
             this.abort(commitMessages);
@@ -118,6 +161,16 @@ public class FormatTableCommit implements BatchTableCommit 
{
         }
     }
 
+    private LinkedHashMap<String, String> extractPartitionSpecFromPath(
+            Path partitionPath, List<String> partitionKeys) {
+        if (formatTablePartitionOnlyValueInPath) {
+            return PartitionPathUtils.extractPartitionSpecFromPathOnlyValue(
+                    partitionPath, partitionKeys);
+        } else {
+            return 
PartitionPathUtils.extractPartitionSpecFromPath(partitionPath);
+        }
+    }
+
     private static Path buildPartitionPath(
             String location,
             Map<String, String> partitionSpec,
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index bc99c35a4a..f225d5baf7 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -516,13 +516,7 @@ public class HiveCatalog extends AbstractCatalog {
         String tagToPartitionField = table.coreOptions().tagToPartitionField();
         if (tagToPartitionField != null) {
             try {
-                List<Partition> partitions =
-                        clients.run(
-                                client ->
-                                        client.listPartitions(
-                                                identifier.getDatabaseName(),
-                                                identifier.getTableName(),
-                                                Short.MAX_VALUE));
+                List<Partition> partitions = listPartitionsFromHms(identifier);
                 return partitions.stream()
                         .map(
                                 part -> {
@@ -558,6 +552,17 @@ public class HiveCatalog extends AbstractCatalog {
         return listPartitionsFromFileSystem(table);
     }
 
+    @VisibleForTesting
+    public List<Partition> listPartitionsFromHms(Identifier identifier)
+            throws TException, InterruptedException {
+        return clients.run(
+                client ->
+                        client.listPartitions(
+                                identifier.getDatabaseName(),
+                                identifier.getTableName(),
+                                Short.MAX_VALUE));
+    }
+
     private List<Map<String, String>> removePartitionsExistsInOtherBranches(
             Identifier identifier, List<Map<String, String>> inputs) throws 
TableNotExistException {
         FileStoreTable mainTable =
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index ee423a0a59..3845a6b8b9 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -31,8 +31,6 @@ import java.io.File
 
 class PaimonHiveTestBase extends PaimonSparkTestBase {
 
-  import PaimonHiveTestBase._
-
   protected lazy val tempHiveDBDir: File = Utils.createTempDir
 
   protected lazy val testHiveMetastore: TestHiveMetastore = new 
TestHiveMetastore
@@ -43,6 +41,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
 
   protected val hiveDbName: String = "test_hive"
 
+  val hiveUri: String = PaimonHiveTestBase.hiveUri
+
   /**
    * Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive 
([[SparkCatalog]] in hive)
    * catalog
@@ -61,7 +61,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
   }
 
   override protected def beforeAll(): Unit = {
-    testHiveMetastore.start(hivePort)
+    testHiveMetastore.start(PaimonHiveTestBase.hivePort)
     super.beforeAll()
     spark.sql(s"USE $sparkCatalogName")
     spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 4a895b10b6..573f3c8899 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.catalog.{DelegateCatalog, Identifier}
 import org.apache.paimon.fs.Path
+import org.apache.paimon.hive.HiveCatalog
 import org.apache.paimon.spark.PaimonHiveTestBase
 import org.apache.paimon.table.FormatTable
 import org.apache.paimon.utils.CompressUtils
@@ -45,6 +46,29 @@ abstract class FormatTableTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Format table: check partition sync") {
+    val tableName = "t"
+    withTable(tableName) {
+      val hiveCatalog =
+        
paimonCatalog.asInstanceOf[DelegateCatalog].wrapped().asInstanceOf[HiveCatalog]
+      sql(
+        s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds` 
bigint) TBLPROPERTIES ('metastore.partitioned-table'='true')")
+      sql(s"INSERT INTO $tableName VALUES (1, 2023)")
+      var ds = 2023L
+      checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
+      var partitions = 
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
+      assert(partitions.size == 0)
+      sql(s"DROP TABLE $tableName")
+      sql(
+        s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds` 
bigint) TBLPROPERTIES ('format-table.commit-hive-sync-url'='$hiveUri', 
'metastore.partitioned-table'='true')")
+      ds = 2024L
+      sql(s"INSERT INTO $tableName VALUES (1, $ds)")
+      checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds)))
+      partitions = 
hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName))
+      assert(partitions.get(0).getValues.get(0).equals(ds.toString))
+    }
+  }
+
   test("Format table: write partitioned table") {
     for (
       (format, compression) <- Seq(

Reply via email to