This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bbf1c9a3fb [spark] Add read batch time metric (#6798)
bbf1c9a3fb is described below
commit bbf1c9a3fb1013e17d4cd64ccb0fd76ec05b28e5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 17 21:30:37 2025 +0800
[spark] Add read batch time metric (#6798)
---
.../paimon/table/format/FormatDataSplit.java | 1 +
.../org/apache/paimon/spark/PaimonBaseScan.scala | 1 +
.../paimon/spark/PaimonFormatTableBaseScan.scala | 1 +
.../org/apache/paimon/spark/PaimonMetrics.scala | 10 +++++++
.../paimon/spark/PaimonPartitionReader.scala | 31 +++++++++++++---------
.../paimon/spark/PaimonRecordReaderIterator.scala | 9 +++++++
.../org/apache/paimon/spark/PaimonSplitScan.scala | 1 +
7 files changed, 41 insertions(+), 13 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index e446337641..7c2ab8f901 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -34,6 +34,7 @@ public class FormatDataSplit implements Split {
private final Path filePath;
private final long fileSize;
private final long offset;
+ // If null, means reading the whole file.
@Nullable private final Long length;
@Nullable private final BinaryRow partition;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index f39f209415..c0b6717269 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -93,6 +93,7 @@ abstract class PaimonBaseScan(table: InnerTable)
Array(
PaimonNumSplitMetric(),
PaimonPartitionSizeMetric(),
+ PaimonReadBatchTimeMetric(),
PaimonPlanningDurationMetric(),
PaimonScannedSnapshotIdMetric(),
PaimonScannedManifestsMetric(),
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 793e15540a..0d84664639 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -66,6 +66,7 @@ abstract class PaimonFormatTableBaseScan
Array(
PaimonNumSplitMetric(),
PaimonPartitionSizeMetric(),
+ PaimonReadBatchTimeMetric(),
PaimonResultedTableFilesMetric()
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 3f4ff179ce..dadd43f5da 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -25,6 +25,7 @@ object PaimonMetrics {
// scan metrics
val NUM_SPLITS = "numSplits"
val PARTITION_SIZE = "partitionSize"
+ val READ_BATCH_TIME = "readBatchTime"
val PLANNING_DURATION = "planningDuration"
val SCANNED_SNAPSHOT_ID = "scannedSnapshotId"
val SCANNED_MANIFESTS = "scannedManifests"
@@ -108,6 +109,15 @@ case class PaimonPartitionSizeTaskMetric(override val
value: Long) extends Paimo
override def name(): String = PaimonMetrics.PARTITION_SIZE
}
+case class PaimonReadBatchTimeMetric() extends PaimonTimingSummaryMetric {
+ override def name(): String = PaimonMetrics.READ_BATCH_TIME
+ override def description0(): String = "read batch time"
+}
+
+case class PaimonReadBatchTimeTaskMetric(value: Long) extends PaimonTaskMetric
{
+ override def name(): String = PaimonMetrics.READ_BATCH_TIME
+}
+
case class PaimonPlanningDurationMetric() extends PaimonTimingSumMetric {
override def name(): String = PaimonMetrics.PLANNING_DURATION
override def description(): String = "planing duration"
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 2ca21a15e1..2040982687 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.disk.IOManager
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.spark.schema.PaimonMetadataColumn
+import org.apache.paimon.table.format.FormatDataSplit
import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
import org.apache.paimon.types.RowType
@@ -54,6 +55,7 @@ case class PaimonPartitionReader(
val rowType = new RowType(dataFields)
SparkInternalRow.create(rowType, blobAsDescriptor)
}
+ private var totalReadBatchTimeMs: Long = 0L
private lazy val read = readBuilder.newRead().withIOManager(ioManager)
@@ -89,6 +91,7 @@ case class PaimonPartitionReader(
if (currentRow != null) {
stop = true
} else {
+ totalReadBatchTimeMs += currentRecordReader.readBatchTimeMs
currentRecordReader.close()
currentRecordReader = readSplit()
if (currentRecordReader == null) {
@@ -101,7 +104,7 @@ case class PaimonPartitionReader(
private def readSplit(): PaimonRecordReaderIterator = {
if (splits.hasNext) {
- val split = splits.next();
+ val split = splits.next()
PaimonRecordReaderIterator(read.createReader(split), metadataColumns,
split)
} else {
null
@@ -110,26 +113,28 @@ case class PaimonPartitionReader(
// Partition metrics need to be computed only once.
private lazy val partitionMetrics: Array[CustomTaskMetric] = {
- val dataSplits = partition.splits.collect { case ds: DataSplit => ds }
- val numSplits = dataSplits.length
- if (dataSplits.nonEmpty) {
- val splitSize =
dataSplits.map(_.dataFiles().asScala.map(_.fileSize).sum).sum
- Array(
- PaimonNumSplitsTaskMetric(numSplits),
- PaimonPartitionSizeTaskMetric(splitSize)
- )
- } else {
- Array.empty[CustomTaskMetric]
- }
+ val numSplits = partition.splits.length
+ val splitSize = partition.splits.map {
+ case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+ case fs: FormatDataSplit =>
+ if (fs.length() == null) fs.fileSize() else fs.length().longValue()
+ case _ => 0
+ }.sum
+
+ Array(
+ PaimonNumSplitsTaskMetric(numSplits),
+ PaimonPartitionSizeTaskMetric(splitSize)
+ )
}
override def currentMetricsValues(): Array[CustomTaskMetric] = {
- partitionMetrics
+ partitionMetrics ++
Array(PaimonReadBatchTimeTaskMetric(totalReadBatchTimeMs))
}
override def close(): Unit = {
try {
if (currentRecordReader != null) {
+ totalReadBatchTimeMs += currentRecordReader.readBatchTimeMs
currentRecordReader.close()
}
} finally {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 6928894e6b..16a05287ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.CloseableIterator
import org.apache.spark.sql.PaimonUtils
import java.io.IOException
+import java.util.concurrent.TimeUnit.NANOSECONDS
case class PaimonRecordReaderIterator(
reader: RecordReader[PaimonInternalRow],
@@ -56,6 +57,7 @@ case class PaimonRecordReaderIterator(
private var currentIterator: RecordReader.RecordIterator[PaimonInternalRow]
= readBatch()
private var advanced = false
private var currentResult: PaimonInternalRow = _
+ private var readBatchTimeNs: Long = 0L
override def hasNext: Boolean = {
if (currentIterator == null) {
@@ -88,6 +90,8 @@ case class PaimonRecordReaderIterator(
}
private def readBatch(): RecordReader.RecordIterator[PaimonInternalRow] = {
+ val startTimeNs = System.nanoTime()
+
val iter = reader.readBatch()
iter match {
case fileRecordIterator: FileRecordIterator[_] =>
@@ -102,9 +106,14 @@ case class PaimonRecordReaderIterator(
"Only append table or deletion vector table support querying
metadata columns.")
}
}
+ readBatchTimeNs += System.nanoTime() - startTimeNs
iter
}
+ def readBatchTimeMs: Long = {
+ NANOSECONDS.toMillis(readBatchTimeNs)
+ }
+
private def advanceIfNeeded(): Unit = {
if (!advanced) {
advanced = true
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index f47081e5b2..f5998ff73b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -61,6 +61,7 @@ case class PaimonSplitScan(
Array(
PaimonNumSplitMetric(),
PaimonPartitionSizeMetric(),
+ PaimonReadBatchTimeMetric(),
PaimonResultedTableFilesMetric()
)
}