This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e4543c4775 [spark] Add v2 write metrics (#6386)
e4543c4775 is described below

commit e4543c477599618848dc33b7a046e5f2a6bd5ac2
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Oct 13 21:02:13 2025 +0800

    [spark] Add v2 write metrics (#6386)
---
 .../paimon/operation/metrics/CommitMetrics.java    |  46 +++---
 .../operation/metrics/WriterBufferMetric.java      |  10 +-
 .../paimon/spark/catalyst/Compatibility.scala      |   6 +
 .../paimon/spark/catalyst/Compatibility.scala      |   6 +
 .../org/apache/paimon/spark/PaimonMetrics.scala    | 169 ++++++++++++++-------
 .../paimon/spark/catalyst/Compatibility.scala      |   6 +
 .../paimon/spark/metric/SparkMetricRegistry.scala  |  66 ++++++--
 .../apache/paimon/spark/write/PaimonV2Write.scala  |  67 +++++++-
 .../apache/paimon/spark/sql/PaimonMetricTest.scala |  19 +++
 9 files changed, 297 insertions(+), 98 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index e64fc7be16..c89bae8c9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -27,7 +27,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 public class CommitMetrics {
 
     private static final int HISTOGRAM_WINDOW_SIZE = 100;
-    private static final String GROUP_NAME = "commit";
+    public static final String GROUP_NAME = "commit";
 
     private final MetricGroup metricGroup;
 
@@ -44,40 +44,36 @@ public class CommitMetrics {
     private Histogram durationHistogram;
     private CommitStats latestCommit;
 
-    @VisibleForTesting static final String LAST_COMMIT_DURATION = 
"lastCommitDuration";
-    @VisibleForTesting static final String COMMIT_DURATION = "commitDuration";
-    @VisibleForTesting static final String LAST_COMMIT_ATTEMPTS = 
"lastCommitAttempts";
-    @VisibleForTesting static final String LAST_TABLE_FILES_ADDED = 
"lastTableFilesAdded";
-    @VisibleForTesting static final String LAST_TABLE_FILES_DELETED = 
"lastTableFilesDeleted";
-    @VisibleForTesting static final String LAST_TABLE_FILES_APPENDED = 
"lastTableFilesAppended";
+    public static final String LAST_COMMIT_DURATION = "lastCommitDuration";
+    public static final String COMMIT_DURATION = "commitDuration";
+    public static final String LAST_COMMIT_ATTEMPTS = "lastCommitAttempts";
+    public static final String LAST_TABLE_FILES_ADDED = "lastTableFilesAdded";
+    public static final String LAST_TABLE_FILES_DELETED = 
"lastTableFilesDeleted";
+    public static final String LAST_TABLE_FILES_APPENDED = 
"lastTableFilesAppended";
 
-    @VisibleForTesting
-    static final String LAST_TABLE_FILES_COMMIT_COMPACTED = 
"lastTableFilesCommitCompacted";
+    public static final String LAST_TABLE_FILES_COMMIT_COMPACTED = 
"lastTableFilesCommitCompacted";
 
-    @VisibleForTesting
-    static final String LAST_CHANGELOG_FILES_APPENDED = 
"lastChangelogFilesAppended";
+    public static final String LAST_CHANGELOG_FILES_APPENDED = 
"lastChangelogFilesAppended";
 
-    @VisibleForTesting
-    static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED = 
"lastChangelogFileCommitCompacted";
+    public static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED =
+            "lastChangelogFileCommitCompacted";
 
-    @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS = 
"lastGeneratedSnapshots";
-    @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED = 
"lastDeltaRecordsAppended";
+    public static final String LAST_GENERATED_SNAPSHOTS = 
"lastGeneratedSnapshots";
+    public static final String LAST_DELTA_RECORDS_APPENDED = 
"lastDeltaRecordsAppended";
 
-    @VisibleForTesting
-    static final String LAST_CHANGELOG_RECORDS_APPENDED = 
"lastChangelogRecordsAppended";
+    public static final String LAST_CHANGELOG_RECORDS_APPENDED = 
"lastChangelogRecordsAppended";
 
-    @VisibleForTesting
-    static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED = 
"lastDeltaRecordsCommitCompacted";
+    public static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED =
+            "lastDeltaRecordsCommitCompacted";
 
-    @VisibleForTesting
-    static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
+    public static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
             "lastChangelogRecordsCommitCompacted";
 
-    @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN = 
"lastPartitionsWritten";
-    @VisibleForTesting static final String LAST_BUCKETS_WRITTEN = 
"lastBucketsWritten";
+    public static final String LAST_PARTITIONS_WRITTEN = 
"lastPartitionsWritten";
+    public static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten";
 
-    static final String LAST_COMPACTION_INPUT_FILE_SIZE = 
"lastCompactionInputFileSize";
-    static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = 
"lastCompactionOutputFileSize";
+    public static final String LAST_COMPACTION_INPUT_FILE_SIZE = 
"lastCompactionInputFileSize";
+    public static final String LAST_COMPACTION_OUTPUT_FILE_SIZE = 
"lastCompactionOutputFileSize";
 
     private void registerGenericCommitMetrics() {
         metricGroup.gauge(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index e4c676ab1e..fb5963178d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -29,11 +29,11 @@ import java.util.function.Supplier;
 /** Metrics for writer buffer. */
 public class WriterBufferMetric {
 
-    private static final String GROUP_NAME = "writerBuffer";
-    private static final String NUM_WRITERS = "numWriters";
-    private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
-    private static final String USED_WRITE_BUFFER_SIZE = 
"usedWriteBufferSizeByte";
-    private static final String TOTAL_WRITE_BUFFER_SIZE = 
"totalWriteBufferSizeByte";
+    public static final String GROUP_NAME = "writerBuffer";
+    public static final String NUM_WRITERS = "numWriters";
+    public static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+    public static final String USED_WRITE_BUFFER_SIZE = 
"usedWriteBufferSizeByte";
+    public static final String TOTAL_WRITE_BUFFER_SIZE = 
"totalWriteBufferSizeByte";
 
     private final MetricGroup metricGroup;
     private final AtomicInteger numWriters;
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 02e6efa6c6..619521c63c 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.spark.catalyst
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
@@ -41,4 +43,8 @@ object Compatibility {
       ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
     Cast(child, dataType, timeZoneId, ansiEnabled)
   }
+
+  def getExecutionMetrics(spark: SparkSession, executionId: Long): 
Seq[SQLPlanMetric] = {
+    spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 02e6efa6c6..619521c63c 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.spark.catalyst
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
@@ -41,4 +43,8 @@ object Compatibility {
       ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
     Cast(child, dataType, timeZoneId, ansiEnabled)
   }
+
+  def getExecutionMetrics(spark: SparkSession, executionId: Long): 
Seq[SQLPlanMetric] = {
+    spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 3ef5bbdbcb..1ae6aeb6a2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -24,46 +24,32 @@ import 
org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric,
 import java.text.DecimalFormat
 
 object PaimonMetrics {
-
+  // scan metrics
   val NUM_SPLITS = "numSplits"
-
   val SPLIT_SIZE = "splitSize"
-
   val AVG_SPLIT_SIZE = "avgSplitSize"
-
   val PLANNING_DURATION = "planningDuration"
-
   val SCANNED_MANIFESTS = "scannedManifests"
-
   val SKIPPED_TABLE_FILES = "skippedTableFiles"
-
   val RESULTED_TABLE_FILES = "resultedTableFiles"
-}
-
-// paimon's task metric
-sealed trait PaimonTaskMetric extends CustomTaskMetric
 
-case class PaimonNumSplitsTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
-
-  override def name(): String = PaimonMetrics.NUM_SPLITS
+  // write metrics
+  val NUM_WRITERS = "numWriters"
 
+  // commit metrics
+  val COMMIT_DURATION = "commitDuration"
+  val APPENDED_TABLE_FILES = "appendedTableFiles"
+  val APPENDED_RECORDS = "appendedRecords"
+  val APPENDED_CHANGELOG_FILES = "appendedChangelogFiles"
+  val PARTITIONS_WRITTEN = "partitionsWritten"
+  val BUCKETS_WRITTEN = "bucketsWritten"
 }
 
-case class PaimonSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
-
-  override def name(): String = PaimonMetrics.SPLIT_SIZE
-
-}
-
-case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
-
-  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
-
-}
+// Base custom task metrics
+sealed trait PaimonTaskMetric extends CustomTaskMetric
 
-// paimon's sum metric
+// Base custom metrics
 sealed trait PaimonSumMetric extends CustomSumMetric {
-
   protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = {
     var sum: Long = 0L
     for (taskMetric <- taskMetrics) {
@@ -75,31 +61,9 @@ sealed trait PaimonSumMetric extends CustomSumMetric {
   override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
     String.valueOf(aggregateTaskMetrics0(taskMetrics))
   }
-
-}
-
-case class PaimonNumSplitMetric() extends PaimonSumMetric {
-
-  override def name(): String = PaimonMetrics.NUM_SPLITS
-
-  override def description(): String = "number of splits read"
-
-}
-
-case class PaimonSplitSizeMetric() extends PaimonSumMetric {
-
-  override def name(): String = PaimonMetrics.SPLIT_SIZE
-
-  override def description(): String = "size of splits read"
-
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
-  }
 }
 
-// paimon's avg metric
 sealed trait PaimonAvgMetric extends CustomAvgMetric {
-
   protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = {
     if (taskMetrics.length > 0) {
       var sum = 0L
@@ -116,23 +80,57 @@ sealed trait PaimonAvgMetric extends CustomAvgMetric {
     val average = aggregateTaskMetrics0(taskMetrics)
     new DecimalFormat("#0.000").format(average)
   }
+}
 
+sealed trait PaimonMinMaxMetric extends CustomAvgMetric {
+  def description0(): String
+
+  override def description(): String = s"${description0()} total (min, med, 
max)"
+
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    val total = taskMetrics.sum
+    val min = taskMetrics.min
+    val med = taskMetrics.sorted.apply(taskMetrics.length / 2)
+    val max = taskMetrics.max
+    s"$total ($min, $med, $max)"
+  }
 }
 
-case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
+// Scan metrics
+case class PaimonNumSplitMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.NUM_SPLITS
+  override def description(): String = "number of splits read"
+}
 
-  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+case class PaimonNumSplitsTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.NUM_SPLITS
+}
 
-  override def description(): String = "avg size of splits read"
+case class PaimonSplitSizeMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.SPLIT_SIZE
+  override def description(): String = "size of splits read"
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
+  }
+}
 
+case class PaimonSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.SPLIT_SIZE
+}
+
+case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
+  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+  override def description(): String = "avg size of splits read"
   override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
     val average = aggregateTaskMetrics0(taskMetrics).round
     PaimonUtils.bytesToString(average)
   }
+}
 
+case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
 }
 
-// Metrics reported by driver
 case class PaimonPlanningDurationMetric() extends PaimonSumMetric {
   override def name(): String = PaimonMetrics.PLANNING_DURATION
   override def description(): String = "planing duration (ms)"
@@ -168,3 +166,68 @@ case class PaimonResultedTableFilesMetric() extends 
PaimonSumMetric {
 case class PaimonResultedTableFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
   override def name(): String = PaimonMetrics.RESULTED_TABLE_FILES
 }
+
+// Write metrics
+case class PaimonNumWritersMetric() extends PaimonMinMaxMetric {
+  override def name(): String = PaimonMetrics.NUM_WRITERS
+  override def description0(): String = "number of writers"
+}
+
+case class PaimonNumWritersTaskMetric(value: Long) extends PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.NUM_WRITERS
+}
+
+// Commit metrics
+case class PaimonCommitDurationMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.COMMIT_DURATION
+  override def description(): String = "commit duration (ms)"
+}
+
+case class PaimonCommitDurationTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.COMMIT_DURATION
+}
+
+case class PaimonAppendedTableFilesMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
+  override def description(): String = "number of appended table files"
+}
+
+case class PaimonAppendedTableFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
+}
+
+case class PaimonAppendedRecordsMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.APPENDED_RECORDS
+  override def description(): String = "number of appended records"
+}
+
+case class PaimonAppendedRecordsTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.APPENDED_RECORDS
+}
+
+case class PaimonAppendedChangelogFilesMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.APPENDED_CHANGELOG_FILES
+  override def description(): String = "number of appended changelog files"
+}
+
+case class PaimonAppendedChangelogFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.APPENDED_CHANGELOG_FILES
+}
+
+case class PaimonPartitionsWrittenMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.PARTITIONS_WRITTEN
+  override def description(): String = "number of partitions written"
+}
+
+case class PaimonPartitionsWrittenTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.PARTITIONS_WRITTEN
+}
+
+case class PaimonBucketsWrittenMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.BUCKETS_WRITTEN
+  override def description(): String = "number of buckets written"
+}
+
+case class PaimonBucketsWrittenTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.BUCKETS_WRITTEN
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 1fc260fc19..443024058e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.spark.catalyst
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
@@ -41,4 +43,8 @@ object Compatibility {
       ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
     Cast(child, dataType, timeZoneId, ansiEnabled)
   }
+
+  def getExecutionMetrics(spark: SparkSession, executionId: Long): 
Seq[SQLPlanMetric] = {
+    spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 3d858a623f..11fe639f9c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -18,13 +18,13 @@
 
 package org.apache.paimon.spark.metric
 
-import org.apache.paimon.metrics.{Gauge, MetricGroup, MetricGroupImpl, 
MetricRegistry}
-import org.apache.paimon.operation.metrics.ScanMetrics
-import org.apache.paimon.spark.{PaimonPlanningDurationTaskMetric, 
PaimonResultedTableFilesTaskMetric, PaimonScannedManifestsTaskMetric, 
PaimonSkippedTableFilesTaskMetric}
+import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricGroupImpl, 
MetricRegistry}
+import org.apache.paimon.operation.metrics.{CommitMetrics, ScanMetrics, 
WriterBufferMetric}
+import org.apache.paimon.spark._
 
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 
-import java.util
+import java.util.{Map => JMap}
 
 import scala.collection.mutable
 
@@ -34,7 +34,7 @@ case class SparkMetricRegistry() extends MetricRegistry {
 
   override def createMetricGroup(
       groupName: String,
-      variables: util.Map[String, String]): MetricGroup = {
+      variables: JMap[String, String]): MetricGroup = {
     val metricGroup = new MetricGroupImpl(groupName, variables)
     metricGroups.put(groupName, metricGroup)
     metricGroup
@@ -44,15 +44,61 @@ case class SparkMetricRegistry() extends MetricRegistry {
     metricGroups.get(ScanMetrics.GROUP_NAME) match {
       case Some(group) =>
         val metrics = group.getMetrics
-        def gaugeLong(key: String): Long = 
metrics.get(key).asInstanceOf[Gauge[Long]].getValue
         Array(
-          
PaimonPlanningDurationTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_DURATION)),
-          
PaimonScannedManifestsTaskMetric(gaugeLong(ScanMetrics.LAST_SCANNED_MANIFESTS)),
-          
PaimonSkippedTableFilesTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)),
-          
PaimonResultedTableFilesTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES))
+          PaimonPlanningDurationTaskMetric(gauge[Long](metrics, 
ScanMetrics.LAST_SCAN_DURATION)),
+          PaimonScannedManifestsTaskMetric(
+            gauge[Long](metrics, ScanMetrics.LAST_SCANNED_MANIFESTS)),
+          PaimonSkippedTableFilesTaskMetric(
+            gauge[Long](metrics, ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)),
+          PaimonResultedTableFilesTaskMetric(
+            gauge[Long](metrics, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES))
         )
       case None =>
         Array.empty
     }
   }
+
+  def buildSparkWriteMetrics(): Array[CustomTaskMetric] = {
+    metricGroups.get(WriterBufferMetric.GROUP_NAME) match {
+      case Some(group) =>
+        val metrics = group.getMetrics
+        Array(
+          PaimonNumWritersTaskMetric(gauge[Int](metrics, 
WriterBufferMetric.NUM_WRITERS))
+        )
+      case None =>
+        Array.empty
+    }
+  }
+
+  def buildSparkCommitMetrics(): Array[CustomTaskMetric] = {
+    metricGroups.get(CommitMetrics.GROUP_NAME) match {
+      case Some(group) =>
+        val metrics = group.getMetrics
+        Array(
+          PaimonCommitDurationTaskMetric(gauge[Long](metrics, 
CommitMetrics.LAST_COMMIT_DURATION)),
+          PaimonAppendedTableFilesTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_TABLE_FILES_APPENDED)),
+          PaimonAppendedRecordsTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_DELTA_RECORDS_APPENDED)),
+          PaimonAppendedChangelogFilesTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_CHANGELOG_FILES_APPENDED)),
+          PaimonPartitionsWrittenTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_PARTITIONS_WRITTEN)),
+          PaimonBucketsWrittenTaskMetric(gauge[Long](metrics, 
CommitMetrics.LAST_BUCKETS_WRITTEN))
+        )
+      case None =>
+        Array.empty
+    }
+  }
+
+  private def gauge[T](metrics: JMap[String, Metric], key: String): T = {
+    metrics.get(key) match {
+      case null =>
+        throw new NoSuchElementException(s"Metric key '$key' not found")
+      case g: Gauge[_] =>
+        g.getValue.asInstanceOf[T]
+      case m =>
+        throw new ClassCastException(s"Expected Gauge, but got 
${m.getClass.getName}")
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 62a383eb7c..4589b42e8a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -20,16 +20,23 @@ package org.apache.paimon.spark.write
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.{PaimonAppendedChangelogFilesMetric, 
PaimonAppendedRecordsMetric, PaimonAppendedTableFilesMetric, 
PaimonBucketsWrittenMetric, PaimonCommitDurationMetric, PaimonNumWritersMetric, 
PaimonPartitionsWrittenMetric, SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageSerializer, TableWriteImpl}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.PaimonSparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.distributions.Distribution
 import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
 import org.apache.spark.sql.types.StructType
 
 import java.io.{IOException, UncheckedIOException}
@@ -74,6 +81,20 @@ class PaimonV2Write(
   override def toBatch: BatchWrite =
     PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
 
+  override def supportedCustomMetrics(): Array[CustomMetric] = {
+    Array(
+      // write metrics
+      PaimonNumWritersMetric(),
+      // commit metrics
+      PaimonCommitDurationMetric(),
+      PaimonAppendedTableFilesMetric(),
+      PaimonAppendedRecordsMetric(),
+      PaimonAppendedChangelogFilesMetric(),
+      PaimonPartitionsWrittenMetric(),
+      PaimonBucketsWrittenMetric()
+    )
+  }
+
   override def toString: String = {
     val overwriteDynamicStr = if (overwriteDynamic) {
       ", overwriteDynamic=true"
@@ -87,6 +108,8 @@ class PaimonV2Write(
     }
     
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
   }
+
+  override def description(): String = toString
 }
 
 private case class PaimonBatchWrite(
@@ -97,6 +120,8 @@ private case class PaimonBatchWrite(
   extends BatchWrite
   with WriteHelper {
 
+  private val metricRegistry = SparkMetricRegistry()
+
   private val batchWriteBuilder = {
     val builder = table.newBatchWriteBuilder()
     overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
@@ -114,6 +139,7 @@ private case class PaimonBatchWrite(
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     logInfo(s"Committing to table ${table.name()}")
     val batchTableCommit = batchWriteBuilder.newCommit()
+    batchTableCommit.withMetricRegistry(metricRegistry)
 
     val commitMessages = messages
       .collect {
@@ -131,12 +157,31 @@ private case class PaimonBatchWrite(
     } finally {
       batchTableCommit.close()
     }
+    postDriverMetrics()
     postCommit(commitMessages)
   }
 
   override def abort(messages: Array[WriterCommitMessage]): Unit = {
     // TODO clean uncommitted files
   }
+
+  // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
+  // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
+  private def postDriverMetrics(): Unit = {
+    val spark = PaimonSparkSession.active
+    // todo: find a more suitable way to get metrics.
+    val commitMetrics = metricRegistry.buildSparkCommitMetrics()
+    val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong)
+    val metricUpdates = executionMetrics.flatMap {
+      m =>
+        commitMetrics.find(x => 
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
+          case Some(customTaskMetric) => Some((m.accumulatorId, 
customTaskMetric.value()))
+          case None => None
+        }
+    }
+    SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, 
executionId, metricUpdates)
+  }
 }
 
 private case class WriterFactory(
@@ -147,13 +192,12 @@ private case class WriterFactory(
   extends DataWriterFactory {
 
   override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    val batchTableWrite = 
batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
-    PaimonDataWriter(batchTableWrite, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
+    PaimonDataWriter(batchWriteBuilder, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
   }
 }
 
 private case class PaimonDataWriter(
-    write: TableWriteImpl[InternalRow],
+    writeBuilder: BatchWriteBuilder,
     writeSchema: StructType,
     dataSchema: StructType,
     fullCompactionDeltaCommits: Option[Int],
@@ -162,7 +206,16 @@ private case class PaimonDataWriter(
   with DataWriteHelper {
 
   private val ioManager = SparkUtils.createIOManager()
-  write.withIOManager(ioManager)
+
+  private val metricRegistry = SparkMetricRegistry()
+
+  val write: TableWriteImpl[InternalRow] = {
+    writeBuilder
+      .newWrite()
+      .withIOManager(ioManager)
+      .withMetricRegistry(metricRegistry)
+      .asInstanceOf[TableWriteImpl[InternalRow]]
+  }
 
   private val rowConverter: InternalRow => SparkInternalRowWrapper = {
     val numFields = writeSchema.fields.length
@@ -194,6 +247,10 @@ private case class PaimonDataWriter(
       case e: Exception => throw new RuntimeException(e)
     }
   }
+
+  override def currentMetricsValues(): Array[CustomTaskMetric] = {
+    metricRegistry.buildSparkWriteMetrics()
+  }
 }
 
 class TaskCommit private (
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index 9656a3caa6..58883879c7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.source.DataSplit
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.execution.CommandResultExec
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.junit.jupiter.api.Assertions
 
@@ -110,6 +111,24 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
     Assertions.assertTrue(bytesWritten > 0)
   }
 
+  test(s"Paimon Metric: v2 write metric") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      sql("CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY 
(pt)")
+      val df = sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      val metrics =
+        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+      val statusStore = spark.sharedState.statusStore
+      val lastExecId = statusStore.executionsList().last.executionId
+      val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+      assert(executionMetrics(metrics("appendedTableFiles").id) == "2")
+      assert(executionMetrics(metrics("appendedRecords").id) == "2")
+      assert(executionMetrics(metrics("appendedChangelogFiles").id) == "0")
+      assert(executionMetrics(metrics("partitionsWritten").id) == "2")
+      assert(executionMetrics(metrics("bucketsWritten").id) == "2")
+    }
+  }
+
   def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
     metrics.find(_.name() == name).get.value()
   }

Reply via email to