This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1258f5ff35 [spark] Fix the display of V2 delete metrics (#7114)
1258f5ff35 is described below

commit 1258f5ff35aa946338465be6d8b0e950f8eabd91
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Jan 25 12:03:06 2026 +0800

    [spark] Fix the display of V2 delete metrics (#7114)
---
 .../paimon/operation/metrics/CommitStats.java      | 21 ++++--
 .../org/apache/paimon/spark/PaimonMetrics.scala    | 38 ++++++----
 .../paimon/spark/metric/SparkMetricRegistry.scala  |  8 ++-
 .../apache/paimon/spark/write/PaimonV2Write.scala  | 29 ++++++--
 .../apache/paimon/spark/sql/PaimonMetricTest.scala | 83 +++++++++++++++++-----
 5 files changed, 131 insertions(+), 48 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
index 657bd6aae1..11d6854270 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
@@ -24,7 +24,6 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -38,8 +37,8 @@ import java.util.stream.Collectors;
 public class CommitStats {
     private final long duration;
     private final int attempts;
-    private final long tableFilesAdded;
     private final long tableFilesAppended;
+    private final long tableFilesAdded;
     private final long tableFilesDeleted;
     private final long changelogFilesAppended;
     private final long compactionInputFileSize;
@@ -63,19 +62,29 @@ public class CommitStats {
             long commitDuration,
             int generatedSnapshots,
             int attempts) {
-        List<ManifestEntry> addedTableFiles = new 
ArrayList<>(appendTableFiles);
+        List<ManifestEntry> addedTableFiles =
+                appendTableFiles.stream()
+                        .filter(f -> FileKind.ADD.equals(f.kind()))
+                        .collect(Collectors.toList());
+        List<ManifestEntry> deletedTableFiles =
+                appendTableFiles.stream()
+                        .filter(f -> FileKind.DELETE.equals(f.kind()))
+                        .collect(Collectors.toList());
+
         List<ManifestEntry> compactAfterFiles =
                 compactTableFiles.stream()
                         .filter(f -> FileKind.ADD.equals(f.kind()))
                         .collect(Collectors.toList());
         addedTableFiles.addAll(compactAfterFiles);
-        List<ManifestEntry> deletedTableFiles =
+
+        List<ManifestEntry> compactionInputFiles =
                 compactTableFiles.stream()
                         .filter(f -> FileKind.DELETE.equals(f.kind()))
                         .collect(Collectors.toList());
+        deletedTableFiles.addAll(compactionInputFiles);
 
         this.compactionInputFileSize =
-                deletedTableFiles.stream()
+                compactionInputFiles.stream()
                         .map(ManifestEntry::file)
                         .map(DataFileMeta::fileSize)
                         .reduce(Long::sum)
@@ -86,8 +95,8 @@ public class CommitStats {
                         .map(DataFileMeta::fileSize)
                         .reduce(Long::sum)
                         .orElse(0L);
-        this.tableFilesAdded = addedTableFiles.size();
         this.tableFilesAppended = appendTableFiles.size();
+        this.tableFilesAdded = addedTableFiles.size();
         this.tableFilesDeleted = deletedTableFiles.size();
         this.tableFilesCompacted = compactTableFiles.size();
         this.changelogFilesAppended = appendChangelogFiles.size();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index dadd43f5da..c8ceba357b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 
 object PaimonMetrics {
-  // scan metrics
+  // read metrics
   val NUM_SPLITS = "numSplits"
   val PARTITION_SIZE = "partitionSize"
   val READ_BATCH_TIME = "readBatchTime"
@@ -37,8 +37,9 @@ object PaimonMetrics {
 
   // commit metrics
   val COMMIT_DURATION = "commitDuration"
-  val APPENDED_TABLE_FILES = "appendedTableFiles"
-  val APPENDED_RECORDS = "appendedRecords"
+  val ADDED_TABLE_FILES = "addedTableFiles"
+  val DELETED_TABLE_FILES = "deletedTableFiles"
+  val INSERTED_RECORDS = "insertedRecords"
   val APPENDED_CHANGELOG_FILES = "appendedChangelogFiles"
   val PARTITIONS_WRITTEN = "partitionsWritten"
   val BUCKETS_WRITTEN = "bucketsWritten"
@@ -90,7 +91,7 @@ sealed trait PaimonSummaryMetric extends PaimonCustomMetric {
 sealed trait PaimonSizeSummaryMetric extends PaimonSummaryMetric with 
PaimonSizeMetric
 sealed trait PaimonTimingSummaryMetric extends PaimonSummaryMetric with 
PaimonTimingMetric
 
-// Scan metrics
+// Read metrics
 case class PaimonNumSplitMetric() extends PaimonSumMetric {
   override def name(): String = PaimonMetrics.NUM_SPLITS
   override def description(): String = "number of splits read"
@@ -183,22 +184,31 @@ case class PaimonCommitDurationTaskMetric(value: Long) 
extends PaimonTaskMetric
   override def name(): String = PaimonMetrics.COMMIT_DURATION
 }
 
-case class PaimonAppendedTableFilesMetric() extends PaimonSumMetric {
-  override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
-  override def description(): String = "number of appended table files"
+case class PaimonAddedTableFilesMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.ADDED_TABLE_FILES
+  override def description(): String = "number of added table files"
 }
 
-case class PaimonAppendedTableFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
-  override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
+case class PaimonAddedTableFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.ADDED_TABLE_FILES
 }
 
-case class PaimonAppendedRecordsMetric() extends PaimonSumMetric {
-  override def name(): String = PaimonMetrics.APPENDED_RECORDS
-  override def description(): String = "number of appended records"
+case class PaimonDeletedTableFilesMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.DELETED_TABLE_FILES
+  override def description(): String = "number of deleted table files"
 }
 
-case class PaimonAppendedRecordsTaskMetric(value: Long) extends 
PaimonTaskMetric {
-  override def name(): String = PaimonMetrics.APPENDED_RECORDS
+case class PaimonDeletedTableFilesTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.DELETED_TABLE_FILES
+}
+
+case class PaimonInsertedRecordsMetric() extends PaimonSumMetric {
+  override def name(): String = PaimonMetrics.INSERTED_RECORDS
+  override def description(): String = "number of inserted records"
+}
+
+case class PaimonInsertedRecordsTaskMetric(value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.INSERTED_RECORDS
 }
 
 case class PaimonAppendedChangelogFilesMetric() extends PaimonSumMetric {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 24b8b3dbc7..9aeeed7a03 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -78,9 +78,11 @@ case class SparkMetricRegistry() extends MetricRegistry {
         val metrics = group.getMetrics
         Array(
           PaimonCommitDurationTaskMetric(gauge[Long](metrics, 
CommitMetrics.LAST_COMMIT_DURATION)),
-          PaimonAppendedTableFilesTaskMetric(
-            gauge[Long](metrics, CommitMetrics.LAST_TABLE_FILES_APPENDED)),
-          PaimonAppendedRecordsTaskMetric(
+          PaimonAddedTableFilesTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_TABLE_FILES_ADDED)),
+          PaimonDeletedTableFilesTaskMetric(
+            gauge[Long](metrics, CommitMetrics.LAST_TABLE_FILES_DELETED)),
+          PaimonInsertedRecordsTaskMetric(
             gauge[Long](metrics, CommitMetrics.LAST_DELTA_RECORDS_APPENDED)),
           PaimonAppendedChangelogFilesTaskMetric(
             gauge[Long](metrics, CommitMetrics.LAST_CHANGELOG_FILES_APPENDED)),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index e2ead2f726..e4676ae4af 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -18,10 +18,12 @@
 
 package org.apache.paimon.spark.write
 
+import org.apache.paimon.CoreOptions.ChangelogProducer
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark._
 import org.apache.paimon.spark.commands.SchemaHelper
 import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.BucketMode.BUCKET_UNAWARE
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.internal.Logging
@@ -31,6 +33,8 @@ import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.types.StructType
 
+import scala.collection.mutable
+
 class PaimonV2Write(
     override val originTable: FileStoreTable,
     overwritePartitions: Option[Map[String, String]],
@@ -62,17 +66,30 @@ class PaimonV2Write(
   }
 
   override def supportedCustomMetrics(): Array[CustomMetric] = {
-    Array(
+    val buffer = mutable.ArrayBuffer[CustomMetric](
       // write metrics
       PaimonNumWritersMetric(),
       // commit metrics
       PaimonCommitDurationMetric(),
-      PaimonAppendedTableFilesMetric(),
-      PaimonAppendedRecordsMetric(),
-      PaimonAppendedChangelogFilesMetric(),
-      PaimonPartitionsWrittenMetric(),
-      PaimonBucketsWrittenMetric()
+      PaimonAddedTableFilesMetric()
     )
+    if (copyOnWriteScan.isEmpty) {
+      // todo: support record metrics for row level ops
+      buffer += PaimonInsertedRecordsMetric()
+    }
+    if (copyOnWriteScan.nonEmpty) {
+      buffer += PaimonDeletedTableFilesMetric()
+    }
+    if (!coreOptions.changelogProducer().equals(ChangelogProducer.NONE)) {
+      buffer += PaimonAppendedChangelogFilesMetric()
+    }
+    if (!table.partitionKeys().isEmpty) {
+      buffer += PaimonPartitionsWrittenMetric()
+    }
+    if (!table.bucketMode().equals(BUCKET_UNAWARE)) {
+      buffer += PaimonBucketsWrittenMetric()
+    }
+    buffer.toArray
   }
 
   override def toString: String = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index b75ac07d7c..66ca1cd803 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -25,10 +25,12 @@ import org.apache.paimon.spark.util.ScanPlanHelper
 import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.execution.CommandResultExec
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.junit.jupiter.api.Assertions
 
 class PaimonMetricTest extends PaimonSparkTestBase with ScanPlanHelper {
@@ -127,17 +129,12 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
     withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
       sql("CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY 
(pt)")
       val df = sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
-      val metrics =
-        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
-      val statusStore = spark.sharedState.statusStore
-      val lastExecId = statusStore.executionsList().last.executionId
-      val executionMetrics = statusStore.executionMetrics(lastExecId)
-
-      assert(executionMetrics(metrics("appendedTableFiles").id) == "2")
-      assert(executionMetrics(metrics("appendedRecords").id) == "2")
-      assert(executionMetrics(metrics("appendedChangelogFiles").id) == "0")
+      val metrics = commandMetrics(df)
+      val executionMetrics = lastExecutionMetrics
+
+      assert(executionMetrics(metrics("addedTableFiles").id) == "2")
+      assert(executionMetrics(metrics("insertedRecords").id) == "2")
       assert(executionMetrics(metrics("partitionsWritten").id) == "2")
-      assert(executionMetrics(metrics("bucketsWritten").id) == "2")
     }
   }
 
@@ -145,20 +142,68 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
     withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
       sql("CREATE TABLE T (id INT, pt INT) PARTITIONED BY (pt)")
       val df = sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id FROM 
range(1, 10)")
-      val metrics =
-        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
-      val statusStore = spark.sharedState.statusStore
-      val lastExecId = statusStore.executionsList().last.executionId
-      val executionMetrics = statusStore.executionMetrics(lastExecId)
-
-      assert(executionMetrics(metrics("appendedTableFiles").id) == "9")
-      assert(executionMetrics(metrics("appendedRecords").id) == "9")
+      val metrics = commandMetrics(df)
+      val executionMetrics = lastExecutionMetrics
+
+      assert(executionMetrics(metrics("addedTableFiles").id) == "9")
+      assert(executionMetrics(metrics("insertedRecords").id) == "9")
       assert(executionMetrics(metrics("partitionsWritten").id) == "9")
-      assert(executionMetrics(metrics("bucketsWritten").id) == "9")
+    }
+  }
+
+  test(s"Paimon Metric: insert bucket table") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      sql("CREATE TABLE T (id INT, v INT) TBLPROPERTIES ('bucket'='3', 
'bucket-key'='id')")
+      val df = sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id as v 
FROM range(0, 100)")
+      val metrics = commandMetrics(df)
+      val executionMetrics = lastExecutionMetrics
+      assert(executionMetrics(metrics("addedTableFiles").id) == "3")
+      assert(executionMetrics(metrics("bucketsWritten").id) == "3")
+    }
+  }
+
+  test(s"Paimon Metric: insert overwrite") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      sql("CREATE TABLE T (id INT, v INT)")
+      sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id as v FROM 
range(0, 10)")
+      val df =
+        sql(s"INSERT OVERWRITE T SELECT /*+ REPARTITION(1) */ id, id as v FROM 
range(0, 100)")
+      val metrics = commandMetrics(df)
+      val executionMetrics = lastExecutionMetrics
+      assert(executionMetrics(metrics("addedTableFiles").id) == "1")
+      assert(executionMetrics(metrics("insertedRecords").id) == "100")
+    }
+  }
+
+  test(s"Paimon Metric: delete") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      sql("CREATE TABLE T (id INT, v INT)")
+      sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id as v FROM 
range(0, 10)")
+      val df = sql("DELETE FROM T WHERE id < 3")
+      val metrics = commandMetrics(df)
+      val executionMetrics = lastExecutionMetrics
+      assert(executionMetrics(metrics("addedTableFiles").id) == "1")
+      assert(executionMetrics(metrics("deletedTableFiles").id) == "1")
+
+      val df1 = sql("DELETE FROM T WHERE id > 0")
+      val metrics1 = commandMetrics(df1)
+      val executionMetrics1 = lastExecutionMetrics
+      assert(executionMetrics1(metrics1("addedTableFiles").id) == "0")
+      assert(executionMetrics1(metrics1("deletedTableFiles").id) == "1")
     }
   }
 
   def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
     metrics.find(_.name() == name).get.value()
   }
+
+  def commandMetrics(df: DataFrame): Map[String, SQLMetric] = {
+    
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+  }
+
+  def lastExecutionMetrics: Map[Long, String] = {
+    val statusStore = spark.sharedState.statusStore
+    val lastExecId = statusStore.executionsList().last.executionId
+    statusStore.executionMetrics(lastExecId)
+  }
 }

Reply via email to