This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ca07a5f1 [spark] Update the display of size and timing metrics 
(#6722)
c4ca07a5f1 is described below

commit c4ca07a5f1878cc040f1d95833d0a02068e1f250
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 2 19:24:50 2025 +0800

    [spark] Update the display of size and timing metrics (#6722)
---
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |   3 +-
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |   3 +-
 .../org/apache/paimon/spark/PaimonMetrics.scala    | 102 ++++++++-------------
 .../paimon/spark/PaimonPartitionReader.scala       |  13 ++-
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |   3 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |   4 +
 6 files changed, 54 insertions(+), 74 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index ef2efa6965..8be0e7a4e1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -110,8 +110,7 @@ abstract class PaimonBaseScan(
   override def supportedCustomMetrics: Array[CustomMetric] = {
     Array(
       PaimonNumSplitMetric(),
-      PaimonSplitSizeMetric(),
-      PaimonAvgSplitSizeMetric(),
+      PaimonPartitionSizeMetric(),
       PaimonPlanningDurationMetric(),
       PaimonScannedSnapshotIdMetric(),
       PaimonScannedManifestsMetric(),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index e47b8aedc2..611136a261 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -73,8 +73,7 @@ abstract class PaimonFormatTableBaseScan(
   override def supportedCustomMetrics: Array[CustomMetric] = {
     Array(
       PaimonNumSplitMetric(),
-      PaimonSplitSizeMetric(),
-      PaimonAvgSplitSizeMetric(),
+      PaimonPartitionSizeMetric(),
       PaimonResultedTableFilesMetric()
     )
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 1526739a6a..3f4ff179ce 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -19,15 +19,12 @@
 package org.apache.paimon.spark
 
 import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.connector.metric.{CustomAvgMetric, 
CustomSumMetric, CustomTaskMetric}
-
-import java.text.DecimalFormat
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 
 object PaimonMetrics {
   // scan metrics
   val NUM_SPLITS = "numSplits"
-  val SPLIT_SIZE = "splitSize"
-  val AVG_SPLIT_SIZE = "avgSplitSize"
+  val PARTITION_SIZE = "partitionSize"
   val PLANNING_DURATION = "planningDuration"
   val SCANNED_SNAPSHOT_ID = "scannedSnapshotId"
   val SCANNED_MANIFESTS = "scannedManifests"
@@ -50,52 +47,47 @@ object PaimonMetrics {
 sealed trait PaimonTaskMetric extends CustomTaskMetric
 
 // Base custom metrics
-sealed trait PaimonSumMetric extends CustomSumMetric {
-  protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = {
-    var sum: Long = 0L
-    for (taskMetric <- taskMetrics) {
-      sum += taskMetric
-    }
-    sum
-  }
+trait PaimonCustomMetric extends CustomMetric {
+  def stringValue(l: Long): String = l.toString
+}
 
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    String.valueOf(aggregateTaskMetrics0(taskMetrics))
-  }
+trait PaimonSizeMetric extends PaimonCustomMetric {
+  override def stringValue(l: Long): String = PaimonUtils.bytesToString(l)
 }
 
-sealed trait PaimonAvgMetric extends CustomAvgMetric {
-  protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = {
-    if (taskMetrics.length > 0) {
-      var sum = 0L
-      for (taskMetric <- taskMetrics) {
-        sum += taskMetric
-      }
-      sum.toDouble / taskMetrics.length
-    } else {
-      0d
-    }
-  }
+trait PaimonTimingMetric extends PaimonCustomMetric {
+  override def stringValue(l: Long): String = PaimonUtils.msDurationToString(l)
+}
 
+sealed trait PaimonSumMetric extends PaimonCustomMetric {
   override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    val average = aggregateTaskMetrics0(taskMetrics)
-    new DecimalFormat("#0.000").format(average)
+    stringValue(taskMetrics.sum)
   }
 }
+sealed trait PaimonSizeSumMetric extends PaimonSumMetric with PaimonSizeMetric
+sealed trait PaimonTimingSumMetric extends PaimonSumMetric with 
PaimonTimingMetric
 
-sealed trait PaimonMinMaxMetric extends CustomAvgMetric {
+sealed trait PaimonSummaryMetric extends PaimonCustomMetric {
   def description0(): String
 
-  override def description(): String = s"${description0()} total (min, med, 
max)"
+  override def description(): String = s"${description0()} total (min, avg, 
med, max)"
 
   override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    val total = taskMetrics.sum
-    val min = taskMetrics.min
-    val med = taskMetrics.sorted.apply(taskMetrics.length / 2)
-    val max = taskMetrics.max
-    s"$total ($min, $med, $max)"
+    if (taskMetrics.length == 0) {
+      s"None"
+    } else {
+      val sorted = taskMetrics.sorted
+      val total = sorted.sum
+      val min = sorted.head
+      val avg = total / sorted.length
+      val med = sorted.apply(sorted.length / 2)
+      val max = sorted.last
+      s"\n${stringValue(total)} (${stringValue(min)}, ${stringValue(avg)}, 
${stringValue(med)}, ${stringValue(max)})"
+    }
   }
 }
+sealed trait PaimonSizeSummaryMetric extends PaimonSummaryMetric with 
PaimonSizeMetric
+sealed trait PaimonTimingSummaryMetric extends PaimonSummaryMetric with 
PaimonTimingMetric
 
 // Scan metrics
 case class PaimonNumSplitMetric() extends PaimonSumMetric {
@@ -107,34 +99,18 @@ case class PaimonNumSplitsTaskMetric(override val value: 
Long) extends PaimonTas
   override def name(): String = PaimonMetrics.NUM_SPLITS
 }
 
-case class PaimonSplitSizeMetric() extends PaimonSumMetric {
-  override def name(): String = PaimonMetrics.SPLIT_SIZE
-  override def description(): String = "size of splits read"
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
-  }
-}
-
-case class PaimonSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
-  override def name(): String = PaimonMetrics.SPLIT_SIZE
-}
-
-case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
-  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
-  override def description(): String = "avg size of splits read"
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    val average = aggregateTaskMetrics0(taskMetrics).round
-    PaimonUtils.bytesToString(average)
-  }
+case class PaimonPartitionSizeMetric() extends PaimonSizeSummaryMetric {
+  override def name(): String = PaimonMetrics.PARTITION_SIZE
+  override def description0(): String = "partition size"
 }
 
-case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
-  override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+case class PaimonPartitionSizeTaskMetric(override val value: Long) extends 
PaimonTaskMetric {
+  override def name(): String = PaimonMetrics.PARTITION_SIZE
 }
 
-case class PaimonPlanningDurationMetric() extends PaimonSumMetric {
+case class PaimonPlanningDurationMetric() extends PaimonTimingSumMetric {
   override def name(): String = PaimonMetrics.PLANNING_DURATION
-  override def description(): String = "planing duration (ms)"
+  override def description(): String = "planing duration"
 }
 
 case class PaimonPlanningDurationTaskMetric(value: Long) extends 
PaimonTaskMetric {
@@ -178,7 +154,7 @@ case class PaimonResultedTableFilesTaskMetric(value: Long) 
extends PaimonTaskMet
 }
 
 // Write metrics
-case class PaimonNumWritersMetric() extends PaimonMinMaxMetric {
+case class PaimonNumWritersMetric() extends PaimonSummaryMetric {
   override def name(): String = PaimonMetrics.NUM_WRITERS
   override def description0(): String = "number of writers"
 }
@@ -188,9 +164,9 @@ case class PaimonNumWritersTaskMetric(value: Long) extends 
PaimonTaskMetric {
 }
 
 // Commit metrics
-case class PaimonCommitDurationMetric() extends PaimonSumMetric {
+case class PaimonCommitDurationMetric() extends PaimonTimingSumMetric {
   override def name(): String = PaimonMetrics.COMMIT_DURATION
-  override def description(): String = "commit duration (ms)"
+  override def description(): String = "commit duration"
 }
 
 case class PaimonCommitDurationTaskMetric(value: Long) extends 
PaimonTaskMetric {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 582e946dda..2ca21a15e1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -108,20 +108,23 @@ case class PaimonPartitionReader(
     }
   }
 
-  override def currentMetricsValues(): Array[CustomTaskMetric] = {
+  // Partition metrics need to be computed only once.
+  private lazy val partitionMetrics: Array[CustomTaskMetric] = {
     val dataSplits = partition.splits.collect { case ds: DataSplit => ds }
     val numSplits = dataSplits.length
-    val paimonMetricsValues: Array[CustomTaskMetric] = if 
(dataSplits.nonEmpty) {
+    if (dataSplits.nonEmpty) {
       val splitSize = 
dataSplits.map(_.dataFiles().asScala.map(_.fileSize).sum).sum
       Array(
         PaimonNumSplitsTaskMetric(numSplits),
-        PaimonSplitSizeTaskMetric(splitSize),
-        PaimonAvgSplitSizeTaskMetric(splitSize / numSplits)
+        PaimonPartitionSizeTaskMetric(splitSize)
       )
     } else {
       Array.empty[CustomTaskMetric]
     }
-    super.currentMetricsValues() ++ paimonMetricsValues
+  }
+
+  override def currentMetricsValues(): Array[CustomTaskMetric] = {
+    partitionMetrics
   }
 
   override def close(): Unit = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 40b35f4d7c..ef2b171f50 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -55,8 +55,7 @@ case class PaimonSplitScan(
   override def supportedCustomMetrics: Array[CustomMetric] = {
     Array(
       PaimonNumSplitMetric(),
-      PaimonSplitSizeMetric(),
-      PaimonAvgSplitSizeMetric(),
+      PaimonPartitionSizeMetric(),
       PaimonResultedTableFilesMetric()
     )
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 8f46f25518..68b878a018 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -92,6 +92,10 @@ object PaimonUtils {
     SparkUtils.bytesToString(size)
   }
 
+  def msDurationToString(size: Long): String = {
+    SparkUtils.msDurationToString(size)
+  }
+
   def setInputFileName(inputFileName: String): Unit = {
     InputFileBlockHolder.set(inputFileName, 0, -1)
   }

Reply via email to