This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c4ca07a5f1 [spark] Update the display of size and timing metrics
(#6722)
c4ca07a5f1 is described below
commit c4ca07a5f1878cc040f1d95833d0a02068e1f250
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 2 19:24:50 2025 +0800
[spark] Update the display of size and timing metrics (#6722)
---
.../org/apache/paimon/spark/PaimonBaseScan.scala | 3 +-
.../paimon/spark/PaimonFormatTableBaseScan.scala | 3 +-
.../org/apache/paimon/spark/PaimonMetrics.scala | 102 ++++++++-------------
.../paimon/spark/PaimonPartitionReader.scala | 13 ++-
.../org/apache/paimon/spark/PaimonSplitScan.scala | 3 +-
.../scala/org/apache/spark/sql/PaimonUtils.scala | 4 +
6 files changed, 54 insertions(+), 74 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index ef2efa6965..8be0e7a4e1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -110,8 +110,7 @@ abstract class PaimonBaseScan(
override def supportedCustomMetrics: Array[CustomMetric] = {
Array(
PaimonNumSplitMetric(),
- PaimonSplitSizeMetric(),
- PaimonAvgSplitSizeMetric(),
+ PaimonPartitionSizeMetric(),
PaimonPlanningDurationMetric(),
PaimonScannedSnapshotIdMetric(),
PaimonScannedManifestsMetric(),
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index e47b8aedc2..611136a261 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -73,8 +73,7 @@ abstract class PaimonFormatTableBaseScan(
override def supportedCustomMetrics: Array[CustomMetric] = {
Array(
PaimonNumSplitMetric(),
- PaimonSplitSizeMetric(),
- PaimonAvgSplitSizeMetric(),
+ PaimonPartitionSizeMetric(),
PaimonResultedTableFilesMetric()
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 1526739a6a..3f4ff179ce 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -19,15 +19,12 @@
package org.apache.paimon.spark
import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.connector.metric.{CustomAvgMetric,
CustomSumMetric, CustomTaskMetric}
-
-import java.text.DecimalFormat
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
object PaimonMetrics {
// scan metrics
val NUM_SPLITS = "numSplits"
- val SPLIT_SIZE = "splitSize"
- val AVG_SPLIT_SIZE = "avgSplitSize"
+ val PARTITION_SIZE = "partitionSize"
val PLANNING_DURATION = "planningDuration"
val SCANNED_SNAPSHOT_ID = "scannedSnapshotId"
val SCANNED_MANIFESTS = "scannedManifests"
@@ -50,52 +47,47 @@ object PaimonMetrics {
sealed trait PaimonTaskMetric extends CustomTaskMetric
// Base custom metrics
-sealed trait PaimonSumMetric extends CustomSumMetric {
- protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = {
- var sum: Long = 0L
- for (taskMetric <- taskMetrics) {
- sum += taskMetric
- }
- sum
- }
+trait PaimonCustomMetric extends CustomMetric {
+ def stringValue(l: Long): String = l.toString
+}
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- String.valueOf(aggregateTaskMetrics0(taskMetrics))
- }
+trait PaimonSizeMetric extends PaimonCustomMetric {
+ override def stringValue(l: Long): String = PaimonUtils.bytesToString(l)
}
-sealed trait PaimonAvgMetric extends CustomAvgMetric {
- protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = {
- if (taskMetrics.length > 0) {
- var sum = 0L
- for (taskMetric <- taskMetrics) {
- sum += taskMetric
- }
- sum.toDouble / taskMetrics.length
- } else {
- 0d
- }
- }
+trait PaimonTimingMetric extends PaimonCustomMetric {
+ override def stringValue(l: Long): String = PaimonUtils.msDurationToString(l)
+}
+sealed trait PaimonSumMetric extends PaimonCustomMetric {
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- val average = aggregateTaskMetrics0(taskMetrics)
- new DecimalFormat("#0.000").format(average)
+ stringValue(taskMetrics.sum)
}
}
+sealed trait PaimonSizeSumMetric extends PaimonSumMetric with PaimonSizeMetric
+sealed trait PaimonTimingSumMetric extends PaimonSumMetric with
PaimonTimingMetric
-sealed trait PaimonMinMaxMetric extends CustomAvgMetric {
+sealed trait PaimonSummaryMetric extends PaimonCustomMetric {
def description0(): String
- override def description(): String = s"${description0()} total (min, med,
max)"
+ override def description(): String = s"${description0()} total (min, avg,
med, max)"
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- val total = taskMetrics.sum
- val min = taskMetrics.min
- val med = taskMetrics.sorted.apply(taskMetrics.length / 2)
- val max = taskMetrics.max
- s"$total ($min, $med, $max)"
+ if (taskMetrics.length == 0) {
+ s"None"
+ } else {
+ val sorted = taskMetrics.sorted
+ val total = sorted.sum
+ val min = sorted.head
+ val avg = total / sorted.length
+ val med = sorted.apply(sorted.length / 2)
+ val max = sorted.last
+ s"\n${stringValue(total)} (${stringValue(min)}, ${stringValue(avg)},
${stringValue(med)}, ${stringValue(max)})"
+ }
}
}
+sealed trait PaimonSizeSummaryMetric extends PaimonSummaryMetric with
PaimonSizeMetric
+sealed trait PaimonTimingSummaryMetric extends PaimonSummaryMetric with
PaimonTimingMetric
// Scan metrics
case class PaimonNumSplitMetric() extends PaimonSumMetric {
@@ -107,34 +99,18 @@ case class PaimonNumSplitsTaskMetric(override val value:
Long) extends PaimonTas
override def name(): String = PaimonMetrics.NUM_SPLITS
}
-case class PaimonSplitSizeMetric() extends PaimonSumMetric {
- override def name(): String = PaimonMetrics.SPLIT_SIZE
- override def description(): String = "size of splits read"
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
- }
-}
-
-case class PaimonSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
- override def name(): String = PaimonMetrics.SPLIT_SIZE
-}
-
-case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
- override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
- override def description(): String = "avg size of splits read"
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- val average = aggregateTaskMetrics0(taskMetrics).round
- PaimonUtils.bytesToString(average)
- }
+case class PaimonPartitionSizeMetric() extends PaimonSizeSummaryMetric {
+ override def name(): String = PaimonMetrics.PARTITION_SIZE
+ override def description0(): String = "partition size"
}
-case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
- override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+case class PaimonPartitionSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.PARTITION_SIZE
}
-case class PaimonPlanningDurationMetric() extends PaimonSumMetric {
+case class PaimonPlanningDurationMetric() extends PaimonTimingSumMetric {
override def name(): String = PaimonMetrics.PLANNING_DURATION
- override def description(): String = "planing duration (ms)"
+ override def description(): String = "planing duration"
}
case class PaimonPlanningDurationTaskMetric(value: Long) extends
PaimonTaskMetric {
@@ -178,7 +154,7 @@ case class PaimonResultedTableFilesTaskMetric(value: Long)
extends PaimonTaskMet
}
// Write metrics
-case class PaimonNumWritersMetric() extends PaimonMinMaxMetric {
+case class PaimonNumWritersMetric() extends PaimonSummaryMetric {
override def name(): String = PaimonMetrics.NUM_WRITERS
override def description0(): String = "number of writers"
}
@@ -188,9 +164,9 @@ case class PaimonNumWritersTaskMetric(value: Long) extends
PaimonTaskMetric {
}
// Commit metrics
-case class PaimonCommitDurationMetric() extends PaimonSumMetric {
+case class PaimonCommitDurationMetric() extends PaimonTimingSumMetric {
override def name(): String = PaimonMetrics.COMMIT_DURATION
- override def description(): String = "commit duration (ms)"
+ override def description(): String = "commit duration"
}
case class PaimonCommitDurationTaskMetric(value: Long) extends
PaimonTaskMetric {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 582e946dda..2ca21a15e1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -108,20 +108,23 @@ case class PaimonPartitionReader(
}
}
- override def currentMetricsValues(): Array[CustomTaskMetric] = {
+ // Partition metrics need to be computed only once.
+ private lazy val partitionMetrics: Array[CustomTaskMetric] = {
val dataSplits = partition.splits.collect { case ds: DataSplit => ds }
val numSplits = dataSplits.length
- val paimonMetricsValues: Array[CustomTaskMetric] = if
(dataSplits.nonEmpty) {
+ if (dataSplits.nonEmpty) {
val splitSize =
dataSplits.map(_.dataFiles().asScala.map(_.fileSize).sum).sum
Array(
PaimonNumSplitsTaskMetric(numSplits),
- PaimonSplitSizeTaskMetric(splitSize),
- PaimonAvgSplitSizeTaskMetric(splitSize / numSplits)
+ PaimonPartitionSizeTaskMetric(splitSize)
)
} else {
Array.empty[CustomTaskMetric]
}
- super.currentMetricsValues() ++ paimonMetricsValues
+ }
+
+ override def currentMetricsValues(): Array[CustomTaskMetric] = {
+ partitionMetrics
}
override def close(): Unit = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 40b35f4d7c..ef2b171f50 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -55,8 +55,7 @@ case class PaimonSplitScan(
override def supportedCustomMetrics: Array[CustomMetric] = {
Array(
PaimonNumSplitMetric(),
- PaimonSplitSizeMetric(),
- PaimonAvgSplitSizeMetric(),
+ PaimonPartitionSizeMetric(),
PaimonResultedTableFilesMetric()
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 8f46f25518..68b878a018 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -92,6 +92,10 @@ object PaimonUtils {
SparkUtils.bytesToString(size)
}
+ def msDurationToString(size: Long): String = {
+ SparkUtils.msDurationToString(size)
+ }
+
def setInputFileName(inputFileName: String): Unit = {
InputFileBlockHolder.set(inputFileName, 0, -1)
}