This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e4543c4775 [spark] Add v2 write metrics (#6386)
e4543c4775 is described below
commit e4543c477599618848dc33b7a046e5f2a6bd5ac2
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Oct 13 21:02:13 2025 +0800
[spark] Add v2 write metrics (#6386)
---
.../paimon/operation/metrics/CommitMetrics.java | 46 +++---
.../operation/metrics/WriterBufferMetric.java | 10 +-
.../paimon/spark/catalyst/Compatibility.scala | 6 +
.../paimon/spark/catalyst/Compatibility.scala | 6 +
.../org/apache/paimon/spark/PaimonMetrics.scala | 169 ++++++++++++++-------
.../paimon/spark/catalyst/Compatibility.scala | 6 +
.../paimon/spark/metric/SparkMetricRegistry.scala | 66 ++++++--
.../apache/paimon/spark/write/PaimonV2Write.scala | 67 +++++++-
.../apache/paimon/spark/sql/PaimonMetricTest.scala | 19 +++
9 files changed, 297 insertions(+), 98 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index e64fc7be16..c89bae8c9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -27,7 +27,7 @@ import org.apache.paimon.metrics.MetricRegistry;
public class CommitMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 100;
- private static final String GROUP_NAME = "commit";
+ public static final String GROUP_NAME = "commit";
private final MetricGroup metricGroup;
@@ -44,40 +44,36 @@ public class CommitMetrics {
private Histogram durationHistogram;
private CommitStats latestCommit;
- @VisibleForTesting static final String LAST_COMMIT_DURATION =
"lastCommitDuration";
- @VisibleForTesting static final String COMMIT_DURATION = "commitDuration";
- @VisibleForTesting static final String LAST_COMMIT_ATTEMPTS =
"lastCommitAttempts";
- @VisibleForTesting static final String LAST_TABLE_FILES_ADDED =
"lastTableFilesAdded";
- @VisibleForTesting static final String LAST_TABLE_FILES_DELETED =
"lastTableFilesDeleted";
- @VisibleForTesting static final String LAST_TABLE_FILES_APPENDED =
"lastTableFilesAppended";
+ public static final String LAST_COMMIT_DURATION = "lastCommitDuration";
+ public static final String COMMIT_DURATION = "commitDuration";
+ public static final String LAST_COMMIT_ATTEMPTS = "lastCommitAttempts";
+ public static final String LAST_TABLE_FILES_ADDED = "lastTableFilesAdded";
+ public static final String LAST_TABLE_FILES_DELETED =
"lastTableFilesDeleted";
+ public static final String LAST_TABLE_FILES_APPENDED =
"lastTableFilesAppended";
- @VisibleForTesting
- static final String LAST_TABLE_FILES_COMMIT_COMPACTED =
"lastTableFilesCommitCompacted";
+ public static final String LAST_TABLE_FILES_COMMIT_COMPACTED =
"lastTableFilesCommitCompacted";
- @VisibleForTesting
- static final String LAST_CHANGELOG_FILES_APPENDED =
"lastChangelogFilesAppended";
+ public static final String LAST_CHANGELOG_FILES_APPENDED =
"lastChangelogFilesAppended";
- @VisibleForTesting
- static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED =
"lastChangelogFileCommitCompacted";
+ public static final String LAST_CHANGELOG_FILES_COMMIT_COMPACTED =
+ "lastChangelogFileCommitCompacted";
- @VisibleForTesting static final String LAST_GENERATED_SNAPSHOTS =
"lastGeneratedSnapshots";
- @VisibleForTesting static final String LAST_DELTA_RECORDS_APPENDED =
"lastDeltaRecordsAppended";
+ public static final String LAST_GENERATED_SNAPSHOTS =
"lastGeneratedSnapshots";
+ public static final String LAST_DELTA_RECORDS_APPENDED =
"lastDeltaRecordsAppended";
- @VisibleForTesting
- static final String LAST_CHANGELOG_RECORDS_APPENDED =
"lastChangelogRecordsAppended";
+ public static final String LAST_CHANGELOG_RECORDS_APPENDED =
"lastChangelogRecordsAppended";
- @VisibleForTesting
- static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED =
"lastDeltaRecordsCommitCompacted";
+ public static final String LAST_DELTA_RECORDS_COMMIT_COMPACTED =
+ "lastDeltaRecordsCommitCompacted";
- @VisibleForTesting
- static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
+ public static final String LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED =
"lastChangelogRecordsCommitCompacted";
- @VisibleForTesting static final String LAST_PARTITIONS_WRITTEN =
"lastPartitionsWritten";
- @VisibleForTesting static final String LAST_BUCKETS_WRITTEN =
"lastBucketsWritten";
+ public static final String LAST_PARTITIONS_WRITTEN =
"lastPartitionsWritten";
+ public static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten";
- static final String LAST_COMPACTION_INPUT_FILE_SIZE =
"lastCompactionInputFileSize";
- static final String LAST_COMPACTION_OUTPUT_FILE_SIZE =
"lastCompactionOutputFileSize";
+ public static final String LAST_COMPACTION_INPUT_FILE_SIZE =
"lastCompactionInputFileSize";
+ public static final String LAST_COMPACTION_OUTPUT_FILE_SIZE =
"lastCompactionOutputFileSize";
private void registerGenericCommitMetrics() {
metricGroup.gauge(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index e4c676ab1e..fb5963178d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -29,11 +29,11 @@ import java.util.function.Supplier;
/** Metrics for writer buffer. */
public class WriterBufferMetric {
- private static final String GROUP_NAME = "writerBuffer";
- private static final String NUM_WRITERS = "numWriters";
- private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
- private static final String USED_WRITE_BUFFER_SIZE =
"usedWriteBufferSizeByte";
- private static final String TOTAL_WRITE_BUFFER_SIZE =
"totalWriteBufferSizeByte";
+ public static final String GROUP_NAME = "writerBuffer";
+ public static final String NUM_WRITERS = "numWriters";
+ public static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+ public static final String USED_WRITE_BUFFER_SIZE =
"usedWriteBufferSizeByte";
+ public static final String TOTAL_WRITE_BUFFER_SIZE =
"totalWriteBufferSizeByte";
private final MetricGroup metricGroup;
private final AtomicInteger numWriters;
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 02e6efa6c6..619521c63c 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
package org.apache.paimon.spark.catalyst
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
@@ -41,4 +43,8 @@ object Compatibility {
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
Cast(child, dataType, timeZoneId, ansiEnabled)
}
+
+ def getExecutionMetrics(spark: SparkSession, executionId: Long):
Seq[SQLPlanMetric] = {
+ spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+ }
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 02e6efa6c6..619521c63c 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
package org.apache.paimon.spark.catalyst
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
@@ -41,4 +43,8 @@ object Compatibility {
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
Cast(child, dataType, timeZoneId, ansiEnabled)
}
+
+ def getExecutionMetrics(spark: SparkSession, executionId: Long):
Seq[SQLPlanMetric] = {
+ spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 3ef5bbdbcb..1ae6aeb6a2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -24,46 +24,32 @@ import
org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric,
import java.text.DecimalFormat
object PaimonMetrics {
-
+ // scan metrics
val NUM_SPLITS = "numSplits"
-
val SPLIT_SIZE = "splitSize"
-
val AVG_SPLIT_SIZE = "avgSplitSize"
-
val PLANNING_DURATION = "planningDuration"
-
val SCANNED_MANIFESTS = "scannedManifests"
-
val SKIPPED_TABLE_FILES = "skippedTableFiles"
-
val RESULTED_TABLE_FILES = "resultedTableFiles"
-}
-
-// paimon's task metric
-sealed trait PaimonTaskMetric extends CustomTaskMetric
-case class PaimonNumSplitsTaskMetric(override val value: Long) extends
PaimonTaskMetric {
-
- override def name(): String = PaimonMetrics.NUM_SPLITS
+ // write metrics
+ val NUM_WRITERS = "numWriters"
+ // commit metrics
+ val COMMIT_DURATION = "commitDuration"
+ val APPENDED_TABLE_FILES = "appendedTableFiles"
+ val APPENDED_RECORDS = "appendedRecords"
+ val APPENDED_CHANGELOG_FILES = "appendedChangelogFiles"
+ val PARTITIONS_WRITTEN = "partitionsWritten"
+ val BUCKETS_WRITTEN = "bucketsWritten"
}
-case class PaimonSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
-
- override def name(): String = PaimonMetrics.SPLIT_SIZE
-
-}
-
-case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
-
- override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
-
-}
+// Base custom task metrics
+sealed trait PaimonTaskMetric extends CustomTaskMetric
-// paimon's sum metric
+// Base custom metrics
sealed trait PaimonSumMetric extends CustomSumMetric {
-
protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = {
var sum: Long = 0L
for (taskMetric <- taskMetrics) {
@@ -75,31 +61,9 @@ sealed trait PaimonSumMetric extends CustomSumMetric {
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
String.valueOf(aggregateTaskMetrics0(taskMetrics))
}
-
-}
-
-case class PaimonNumSplitMetric() extends PaimonSumMetric {
-
- override def name(): String = PaimonMetrics.NUM_SPLITS
-
- override def description(): String = "number of splits read"
-
-}
-
-case class PaimonSplitSizeMetric() extends PaimonSumMetric {
-
- override def name(): String = PaimonMetrics.SPLIT_SIZE
-
- override def description(): String = "size of splits read"
-
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
- }
}
-// paimon's avg metric
sealed trait PaimonAvgMetric extends CustomAvgMetric {
-
protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = {
if (taskMetrics.length > 0) {
var sum = 0L
@@ -116,23 +80,57 @@ sealed trait PaimonAvgMetric extends CustomAvgMetric {
val average = aggregateTaskMetrics0(taskMetrics)
new DecimalFormat("#0.000").format(average)
}
+}
+sealed trait PaimonMinMaxMetric extends CustomAvgMetric {
+ def description0(): String
+
+ override def description(): String = s"${description0()} total (min, med,
max)"
+
+ override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+ val total = taskMetrics.sum
+ val min = taskMetrics.min
+ val med = taskMetrics.sorted.apply(taskMetrics.length / 2)
+ val max = taskMetrics.max
+ s"$total ($min, $med, $max)"
+ }
}
-case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
+// Scan metrics
+case class PaimonNumSplitMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.NUM_SPLITS
+ override def description(): String = "number of splits read"
+}
- override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+case class PaimonNumSplitsTaskMetric(override val value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.NUM_SPLITS
+}
- override def description(): String = "avg size of splits read"
+case class PaimonSplitSizeMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.SPLIT_SIZE
+ override def description(): String = "size of splits read"
+ override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+ PaimonUtils.bytesToString(aggregateTaskMetrics0(taskMetrics))
+ }
+}
+case class PaimonSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.SPLIT_SIZE
+}
+
+case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {
+ override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
+ override def description(): String = "avg size of splits read"
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
val average = aggregateTaskMetrics0(taskMetrics).round
PaimonUtils.bytesToString(average)
}
+}
+case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE
}
-// Metrics reported by driver
case class PaimonPlanningDurationMetric() extends PaimonSumMetric {
override def name(): String = PaimonMetrics.PLANNING_DURATION
override def description(): String = "planing duration (ms)"
@@ -168,3 +166,68 @@ case class PaimonResultedTableFilesMetric() extends
PaimonSumMetric {
case class PaimonResultedTableFilesTaskMetric(value: Long) extends
PaimonTaskMetric {
override def name(): String = PaimonMetrics.RESULTED_TABLE_FILES
}
+
+// Write metrics
+case class PaimonNumWritersMetric() extends PaimonMinMaxMetric {
+ override def name(): String = PaimonMetrics.NUM_WRITERS
+ override def description0(): String = "number of writers"
+}
+
+case class PaimonNumWritersTaskMetric(value: Long) extends PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.NUM_WRITERS
+}
+
+// Commit metrics
+case class PaimonCommitDurationMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.COMMIT_DURATION
+ override def description(): String = "commit duration (ms)"
+}
+
+case class PaimonCommitDurationTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.COMMIT_DURATION
+}
+
+case class PaimonAppendedTableFilesMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
+ override def description(): String = "number of appended table files"
+}
+
+case class PaimonAppendedTableFilesTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.APPENDED_TABLE_FILES
+}
+
+case class PaimonAppendedRecordsMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.APPENDED_RECORDS
+ override def description(): String = "number of appended records"
+}
+
+case class PaimonAppendedRecordsTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.APPENDED_RECORDS
+}
+
+case class PaimonAppendedChangelogFilesMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.APPENDED_CHANGELOG_FILES
+ override def description(): String = "number of appended changelog files"
+}
+
+case class PaimonAppendedChangelogFilesTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.APPENDED_CHANGELOG_FILES
+}
+
+case class PaimonPartitionsWrittenMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.PARTITIONS_WRITTEN
+ override def description(): String = "number of partitions written"
+}
+
+case class PaimonPartitionsWrittenTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.PARTITIONS_WRITTEN
+}
+
+case class PaimonBucketsWrittenMetric() extends PaimonSumMetric {
+ override def name(): String = PaimonMetrics.BUCKETS_WRITTEN
+ override def description(): String = "number of buckets written"
+}
+
+case class PaimonBucketsWrittenTaskMetric(value: Long) extends
PaimonTaskMetric {
+ override def name(): String = PaimonMetrics.BUCKETS_WRITTEN
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 1fc260fc19..443024058e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,9 +18,11 @@
package org.apache.paimon.spark.catalyst
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
@@ -41,4 +43,8 @@ object Compatibility {
ansiEnabled: Boolean = SQLConf.get.ansiEnabled): Cast = {
Cast(child, dataType, timeZoneId, ansiEnabled)
}
+
+ def getExecutionMetrics(spark: SparkSession, executionId: Long):
Seq[SQLPlanMetric] = {
+ spark.sharedState.statusStore.execution(executionId).get.metrics.toSeq
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 3d858a623f..11fe639f9c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -18,13 +18,13 @@
package org.apache.paimon.spark.metric
-import org.apache.paimon.metrics.{Gauge, MetricGroup, MetricGroupImpl,
MetricRegistry}
-import org.apache.paimon.operation.metrics.ScanMetrics
-import org.apache.paimon.spark.{PaimonPlanningDurationTaskMetric,
PaimonResultedTableFilesTaskMetric, PaimonScannedManifestsTaskMetric,
PaimonSkippedTableFilesTaskMetric}
+import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricGroupImpl,
MetricRegistry}
+import org.apache.paimon.operation.metrics.{CommitMetrics, ScanMetrics,
WriterBufferMetric}
+import org.apache.paimon.spark._
import org.apache.spark.sql.connector.metric.CustomTaskMetric
-import java.util
+import java.util.{Map => JMap}
import scala.collection.mutable
@@ -34,7 +34,7 @@ case class SparkMetricRegistry() extends MetricRegistry {
override def createMetricGroup(
groupName: String,
- variables: util.Map[String, String]): MetricGroup = {
+ variables: JMap[String, String]): MetricGroup = {
val metricGroup = new MetricGroupImpl(groupName, variables)
metricGroups.put(groupName, metricGroup)
metricGroup
@@ -44,15 +44,61 @@ case class SparkMetricRegistry() extends MetricRegistry {
metricGroups.get(ScanMetrics.GROUP_NAME) match {
case Some(group) =>
val metrics = group.getMetrics
- def gaugeLong(key: String): Long =
metrics.get(key).asInstanceOf[Gauge[Long]].getValue
Array(
-
PaimonPlanningDurationTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_DURATION)),
-
PaimonScannedManifestsTaskMetric(gaugeLong(ScanMetrics.LAST_SCANNED_MANIFESTS)),
-
PaimonSkippedTableFilesTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)),
-
PaimonResultedTableFilesTaskMetric(gaugeLong(ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES))
+ PaimonPlanningDurationTaskMetric(gauge[Long](metrics,
ScanMetrics.LAST_SCAN_DURATION)),
+ PaimonScannedManifestsTaskMetric(
+ gauge[Long](metrics, ScanMetrics.LAST_SCANNED_MANIFESTS)),
+ PaimonSkippedTableFilesTaskMetric(
+ gauge[Long](metrics, ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES)),
+ PaimonResultedTableFilesTaskMetric(
+ gauge[Long](metrics, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES))
)
case None =>
Array.empty
}
}
+
+ def buildSparkWriteMetrics(): Array[CustomTaskMetric] = {
+ metricGroups.get(WriterBufferMetric.GROUP_NAME) match {
+ case Some(group) =>
+ val metrics = group.getMetrics
+ Array(
+ PaimonNumWritersTaskMetric(gauge[Int](metrics,
WriterBufferMetric.NUM_WRITERS))
+ )
+ case None =>
+ Array.empty
+ }
+ }
+
+ def buildSparkCommitMetrics(): Array[CustomTaskMetric] = {
+ metricGroups.get(CommitMetrics.GROUP_NAME) match {
+ case Some(group) =>
+ val metrics = group.getMetrics
+ Array(
+ PaimonCommitDurationTaskMetric(gauge[Long](metrics,
CommitMetrics.LAST_COMMIT_DURATION)),
+ PaimonAppendedTableFilesTaskMetric(
+ gauge[Long](metrics, CommitMetrics.LAST_TABLE_FILES_APPENDED)),
+ PaimonAppendedRecordsTaskMetric(
+ gauge[Long](metrics, CommitMetrics.LAST_DELTA_RECORDS_APPENDED)),
+ PaimonAppendedChangelogFilesTaskMetric(
+ gauge[Long](metrics, CommitMetrics.LAST_CHANGELOG_FILES_APPENDED)),
+ PaimonPartitionsWrittenTaskMetric(
+ gauge[Long](metrics, CommitMetrics.LAST_PARTITIONS_WRITTEN)),
+ PaimonBucketsWrittenTaskMetric(gauge[Long](metrics,
CommitMetrics.LAST_BUCKETS_WRITTEN))
+ )
+ case None =>
+ Array.empty
+ }
+ }
+
+ private def gauge[T](metrics: JMap[String, Metric], key: String): T = {
+ metrics.get(key) match {
+ case null =>
+ throw new NoSuchElementException(s"Metric key '$key' not found")
+ case g: Gauge[_] =>
+ g.getValue.asInstanceOf[T]
+ case m =>
+ throw new ClassCastException(s"Expected Gauge, but got
${m.getClass.getName}")
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 62a383eb7c..4589b42e8a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -20,16 +20,23 @@ package org.apache.paimon.spark.write
import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.{PaimonAppendedChangelogFilesMetric,
PaimonAppendedRecordsMetric, PaimonAppendedTableFilesMetric,
PaimonBucketsWrittenMetric, PaimonCommitDurationMetric, PaimonNumWritersMetric,
PaimonPartitionsWrittenMetric, SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageSerializer, TableWriteImpl}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.PaimonSparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.distributions.Distribution
import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.sql.types.StructType
import java.io.{IOException, UncheckedIOException}
@@ -74,6 +81,20 @@ class PaimonV2Write(
override def toBatch: BatchWrite =
PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
+ override def supportedCustomMetrics(): Array[CustomMetric] = {
+ Array(
+ // write metrics
+ PaimonNumWritersMetric(),
+ // commit metrics
+ PaimonCommitDurationMetric(),
+ PaimonAppendedTableFilesMetric(),
+ PaimonAppendedRecordsMetric(),
+ PaimonAppendedChangelogFilesMetric(),
+ PaimonPartitionsWrittenMetric(),
+ PaimonBucketsWrittenMetric()
+ )
+ }
+
override def toString: String = {
val overwriteDynamicStr = if (overwriteDynamic) {
", overwriteDynamic=true"
@@ -87,6 +108,8 @@ class PaimonV2Write(
}
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
}
+
+ override def description(): String = toString
}
private case class PaimonBatchWrite(
@@ -97,6 +120,8 @@ private case class PaimonBatchWrite(
extends BatchWrite
with WriteHelper {
+ private val metricRegistry = SparkMetricRegistry()
+
private val batchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
@@ -114,6 +139,7 @@ private case class PaimonBatchWrite(
override def commit(messages: Array[WriterCommitMessage]): Unit = {
logInfo(s"Committing to table ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
+ batchTableCommit.withMetricRegistry(metricRegistry)
val commitMessages = messages
.collect {
@@ -131,12 +157,31 @@ private case class PaimonBatchWrite(
} finally {
batchTableCommit.close()
}
+ postDriverMetrics()
postCommit(commitMessages)
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
// TODO clean uncommitted files
}
+
+ // Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
+ // To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
+ private def postDriverMetrics(): Unit = {
+ val spark = PaimonSparkSession.active
+ // todo: find a more suitable way to get metrics.
+ val commitMetrics = metricRegistry.buildSparkCommitMetrics()
+ val executionId =
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong)
+ val metricUpdates = executionMetrics.flatMap {
+ m =>
+ commitMetrics.find(x =>
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
+ case Some(customTaskMetric) => Some((m.accumulatorId,
customTaskMetric.value()))
+ case None => None
+ }
+ }
+ SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext,
executionId, metricUpdates)
+ }
}
private case class WriterFactory(
@@ -147,13 +192,12 @@ private case class WriterFactory(
extends DataWriterFactory {
override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- val batchTableWrite =
batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
- PaimonDataWriter(batchTableWrite, writeSchema, dataSchema,
fullCompactionDeltaCommits)
+ PaimonDataWriter(batchWriteBuilder, writeSchema, dataSchema,
fullCompactionDeltaCommits)
}
}
private case class PaimonDataWriter(
- write: TableWriteImpl[InternalRow],
+ writeBuilder: BatchWriteBuilder,
writeSchema: StructType,
dataSchema: StructType,
fullCompactionDeltaCommits: Option[Int],
@@ -162,7 +206,16 @@ private case class PaimonDataWriter(
with DataWriteHelper {
private val ioManager = SparkUtils.createIOManager()
- write.withIOManager(ioManager)
+
+ private val metricRegistry = SparkMetricRegistry()
+
+ val write: TableWriteImpl[InternalRow] = {
+ writeBuilder
+ .newWrite()
+ .withIOManager(ioManager)
+ .withMetricRegistry(metricRegistry)
+ .asInstanceOf[TableWriteImpl[InternalRow]]
+ }
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
@@ -194,6 +247,10 @@ private case class PaimonDataWriter(
case e: Exception => throw new RuntimeException(e)
}
}
+
+ override def currentMetricsValues(): Array[CustomTaskMetric] = {
+ metricRegistry.buildSparkWriteMetrics()
+ }
}
class TaskCommit private (
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index 9656a3caa6..58883879c7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.source.DataSplit
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.junit.jupiter.api.Assertions
@@ -110,6 +111,24 @@ class PaimonMetricTest extends PaimonSparkTestBase with
ScanPlanHelper {
Assertions.assertTrue(bytesWritten > 0)
}
+ test(s"Paimon Metric: v2 write metric") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ sql("CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY
(pt)")
+ val df = sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+ val metrics =
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan.metrics
+ val statusStore = spark.sharedState.statusStore
+ val lastExecId = statusStore.executionsList().last.executionId
+ val executionMetrics = statusStore.executionMetrics(lastExecId)
+
+ assert(executionMetrics(metrics("appendedTableFiles").id) == "2")
+ assert(executionMetrics(metrics("appendedRecords").id) == "2")
+ assert(executionMetrics(metrics("appendedChangelogFiles").id) == "0")
+ assert(executionMetrics(metrics("partitionsWritten").id) == "2")
+ assert(executionMetrics(metrics("bucketsWritten").id) == "2")
+ }
+ }
+
def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
metrics.find(_.name() == name).get.value()
}