This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dacd8a7fb7 [spark] Fix spark write metrics with spark adaptive plan 
(#6814)
dacd8a7fb7 is described below

commit dacd8a7fb7e4cc0f1433ff762b4cfb361c4f8401
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 16 08:51:40 2025 +0800

    [spark] Fix spark write metrics with spark adaptive plan (#6814)
---
 .../org/apache/paimon/spark/write/PaimonV2Write.scala   |  2 +-
 .../org/apache/paimon/spark/sql/PaimonMetricTest.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 519550cbf8..79c2d5c8fc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -159,7 +159,7 @@ private case class PaimonBatchWrite(
     // todo: find a more suitable way to get metrics.
     val commitMetrics = metricRegistry.buildSparkCommitMetrics()
     val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong)
+    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong).distinct
     val metricUpdates = executionMetrics.flatMap {
       m =>
         commitMetrics.find(x => 
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index 292fb587e6..d776a00f93 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -140,6 +140,23 @@ class PaimonMetricTest extends PaimonSparkTestBase with 
ScanPlanHelper {
     }
   }
 
+  test(s"Paimon Metric: v2 write metric with adaptive plan") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      sql("CREATE TABLE T (id INT, pt INT) PARTITIONED BY (pt)")
+      val df = sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id FROM 
range(1, 10)")
+      val metrics =
+        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+      val statusStore = spark.sharedState.statusStore
+      val lastExecId = statusStore.executionsList().last.executionId
+      val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+      assert(executionMetrics(metrics("appendedTableFiles").id) == "9")
+      assert(executionMetrics(metrics("appendedRecords").id) == "9")
+      assert(executionMetrics(metrics("partitionsWritten").id) == "9")
+      assert(executionMetrics(metrics("bucketsWritten").id) == "9")
+    }
+  }
+
   def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
     metrics.find(_.name() == name).get.value()
   }

Reply via email to