This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dacd8a7fb7 [spark] Fix spark write metrics with spark adaptive plan
(#6814)
dacd8a7fb7 is described below
commit dacd8a7fb7e4cc0f1433ff762b4cfb361c4f8401
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 16 08:51:40 2025 +0800
[spark] Fix spark write metrics with spark adaptive plan (#6814)
---
.../org/apache/paimon/spark/write/PaimonV2Write.scala | 2 +-
.../org/apache/paimon/spark/sql/PaimonMetricTest.scala | 17 +++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 519550cbf8..79c2d5c8fc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -159,7 +159,7 @@ private case class PaimonBatchWrite(
// todo: find a more suitable way to get metrics.
val commitMetrics = metricRegistry.buildSparkCommitMetrics()
val executionId =
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong)
+ val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong).distinct
val metricUpdates = executionMetrics.flatMap {
m =>
commitMetrics.find(x =>
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index 292fb587e6..d776a00f93 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -140,6 +140,23 @@ class PaimonMetricTest extends PaimonSparkTestBase with
ScanPlanHelper {
}
}
+ test(s"Paimon Metric: v2 write metric with adaptive plan") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ sql("CREATE TABLE T (id INT, pt INT) PARTITIONED BY (pt)")
+ val df = sql(s"INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id FROM
range(1, 10)")
+ val metrics =
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+ val statusStore = spark.sharedState.statusStore
+ val lastExecId = statusStore.executionsList().last.executionId
+ val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+ assert(executionMetrics(metrics("appendedTableFiles").id) == "9")
+ assert(executionMetrics(metrics("appendedRecords").id) == "9")
+ assert(executionMetrics(metrics("partitionsWritten").id) == "9")
+ assert(executionMetrics(metrics("bucketsWritten").id) == "9")
+ }
+ }
+
def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
metrics.find(_.name() == name).get.value()
}