This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a28e6ea22 [spark] support to report partition statistics for spark 
batch job (#5280)
6a28e6ea22 is described below

commit 6a28e6ea221ba6cc13e227518d3dec3dcd13bfdf
Author: WenjunMin <[email protected]>
AuthorDate: Thu Mar 13 18:52:36 2025 +0800

    [spark] support to report partition statistics for spark batch job (#5280)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../generated/flink_connector_configuration.html   |  6 ---
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 ++++
 .../paimon/utils}/PartitionStatisticsReporter.java |  4 +-
 .../utils}/PartitionStatisticsReporterTest.java    |  3 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  8 ----
 .../sink/partition/ReportPartStatsListener.java    |  7 ++-
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 50 +++++++++++++++++++++-
 8 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 32b48fd246..746d83efbb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -671,6 +671,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Duration</td>
             <td>The expiration interval of a partition. A partition will be 
expired if it‘s lifetime is over this value. Partition time is extracted from 
the partition value.</td>
         </tr>
+        <tr>
+            <td><h5>partition.idle-time-to-report-statistic</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>Set a time duration when a partition has no new data after 
this time duration, start to report the partition statistics to hms.</td>
+        </tr>
         <tr>
             <td><h5>partition.legacy-name</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 14a96eed0d..85cfdae55d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,12 +86,6 @@ under the License.
             <td>Duration</td>
             <td>Set a time duration when a partition has no new data after 
this time duration, mark the done status to indicate that the data is 
ready.</td>
         </tr>
-        <tr>
-            <td><h5>partition.idle-time-to-report-statistic</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
-            <td>Duration</td>
-            <td>Set a time duration when a partition has no new data after 
this time duration, start to report the partition statistics to hms.</td>
-        </tr>
         <tr>
             <td><h5>partition.time-interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index df30d7aa17..0b3e24eb5d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1600,6 +1600,14 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Enable data file thin mode to avoid duplicate 
columns storage.");
 
+    public static final ConfigOption<Duration> 
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
+            key("partition.idle-time-to-report-statistic")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    .withDescription(
+                            "Set a time duration when a partition has no new 
data after this time duration, "
+                                    + "start to report the partition 
statistics to hms.");
+
     @ExcludeFromDocumentation("Only used internally to support materialized 
table")
     public static final ConfigOption<String> 
MATERIALIZED_TABLE_DEFINITION_QUERY =
             key("materialized-table.definition-query")
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
rename to 
paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
index f4cd8cfc53..2000c5432a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.Path;
@@ -27,8 +27,6 @@ import org.apache.paimon.table.PartitionHandler;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index c888d6eda0..a47e485b9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.utils;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -33,7 +33,6 @@ import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.utils.PartitionPathUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 195b5163f8..a9eaddae69 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -372,14 +372,6 @@ public class FlinkConnectorOptions {
                             "You can specify time interval for partition, for 
example, "
                                     + "daily partition is '1 d', hourly 
partition is '1 h'.");
 
-    public static final ConfigOption<Duration> 
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
-            key("partition.idle-time-to-report-statistic")
-                    .durationType()
-                    .defaultValue(Duration.ofHours(1))
-                    .withDescription(
-                            "Set a time duration when a partition has no new 
data after this time duration, "
-                                    + "start to report the partition 
statistics to hms.");
-
     public static final ConfigOption<String> CLUSTERING_COLUMNS =
             key("sink.clustering.by-columns")
                     .stringType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index d8b24adde7..45e461405c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
@@ -28,6 +27,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.PartitionStatisticsReporter;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -145,8 +145,7 @@ public class ReportPartStatsListener implements 
PartitionListener {
 
         CoreOptions coreOptions = table.coreOptions();
         Options options = coreOptions.toConfiguration();
-        if 
(options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis()
-                <= 0) {
+        if 
(options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis() <= 
0) {
             return Optional.empty();
         }
 
@@ -177,7 +176,7 @@ public class ReportPartStatsListener implements 
PartitionListener {
                         new PartitionStatisticsReporter(table, 
partitionHandler),
                         stateStore,
                         isRestored,
-                        
options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
+                        
options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
                                 .toMillis()));
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 478a44c886..559fe900b4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions.WRITE_ONLY
 import org.apache.paimon.codegen.CodeGenUtils
 import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
@@ -34,12 +35,13 @@ import org.apache.paimon.table.BucketMode._
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink._
 import org.apache.paimon.types.{RowKind, RowType}
-import org.apache.paimon.utils.SerializationUtils
+import org.apache.paimon.utils.{InternalRowPartitionComputer, 
PartitionPathUtils, PartitionStatisticsReporter, SerializationUtils}
 
 import org.apache.spark.{Partitioner, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions._
+import org.slf4j.LoggerFactory
 
 import java.io.IOException
 import java.util.Collections.singletonMap
@@ -54,6 +56,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
   private lazy val bucketMode = table.bucketMode
 
+  private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
+
   @transient private lazy val serializer = new CommitMessageSerializer
 
   val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
@@ -316,6 +320,48 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       .toSeq
   }
 
+  private def reportToHms(messages: Seq[CommitMessage]): Unit = {
+    val options = table.coreOptions()
+    val config = options.toConfiguration
+
+    if (
+      config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis 
<= 0 ||
+      table.partitionKeys.isEmpty ||
+      !options.partitionedTableInMetastore ||
+      table.catalogEnvironment.partitionHandler() == null
+    ) {
+      return
+    }
+
+    val partitionComputer = new InternalRowPartitionComputer(
+      options.partitionDefaultName,
+      table.schema.logicalPartitionType,
+      table.partitionKeys.toArray(new Array[String](0)),
+      options.legacyPartitionName()
+    )
+    val hmsReporter = new PartitionStatisticsReporter(
+      table,
+      table.catalogEnvironment.partitionHandler()
+    )
+
+    val partitions = messages.map(_.partition()).distinct
+    val currentTime = System.currentTimeMillis()
+    try {
+      partitions.foreach {
+        partition =>
+          val partitionPath = PartitionPathUtils.generatePartitionPath(
+            partitionComputer.generatePartValues(partition))
+          hmsReporter.report(partitionPath, currentTime)
+      }
+    } catch {
+      case e: Throwable =>
+        log.warn("Failed to report to hms", e)
+
+    } finally {
+      hmsReporter.close()
+    }
+  }
+
   def commit(commitMessages: Seq[CommitMessage]): Unit = {
     val tableCommit = writeBuilder.newCommit()
     try {
@@ -325,6 +371,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
     } finally {
       tableCommit.close()
     }
+
+    reportToHms(commitMessages)
   }
 
   /** Boostrap and repartition for cross partition mode. */

Reply via email to