This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf1c9a3fb [spark] Add read batch time metric (#6798)
bbf1c9a3fb is described below

commit bbf1c9a3fb1013e17d4cd64ccb0fd76ec05b28e5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 17 21:30:37 2025 +0800

    [spark] Add read batch time metric (#6798)
---
 .../paimon/table/format/FormatDataSplit.java       |  1 +
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  1 +
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |  1 +
 .../org/apache/paimon/spark/PaimonMetrics.scala    | 10 +++++++
 .../paimon/spark/PaimonPartitionReader.scala       | 31 +++++++++++++---------
 .../paimon/spark/PaimonRecordReaderIterator.scala  |  9 +++++++
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |  1 +
 7 files changed, 41 insertions(+), 13 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index e446337641..7c2ab8f901 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -34,6 +34,7 @@ public class FormatDataSplit implements Split {
     private final Path filePath;
     private final long fileSize;
     private final long offset;
+    // If null, means reading the whole file.
     @Nullable private final Long length;
     @Nullable private final BinaryRow partition;
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index f39f209415..c0b6717269 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -93,6 +93,7 @@ abstract class PaimonBaseScan(table: InnerTable)
     Array(
       PaimonNumSplitMetric(),
       PaimonPartitionSizeMetric(),
+      PaimonReadBatchTimeMetric(),
       PaimonPlanningDurationMetric(),
       PaimonScannedSnapshotIdMetric(),
       PaimonScannedManifestsMetric(),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 793e15540a..0d84664639 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -66,6 +66,7 @@ abstract class PaimonFormatTableBaseScan
     Array(
       PaimonNumSplitMetric(),
       PaimonPartitionSizeMetric(),
+      PaimonReadBatchTimeMetric(),
       PaimonResultedTableFilesMetric()
     )
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
index 3f4ff179ce..dadd43f5da 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala
@@ -25,6 +25,7 @@ object PaimonMetrics {
   // scan metrics
   val NUM_SPLITS = "numSplits"
   val PARTITION_SIZE = "partitionSize"
+  val READ_BATCH_TIME = "readBatchTime"
   val PLANNING_DURATION = "planningDuration"
   val SCANNED_SNAPSHOT_ID = "scannedSnapshotId"
   val SCANNED_MANIFESTS = "scannedManifests"
@@ -108,6 +109,15 @@ case class PaimonPartitionSizeTaskMetric(override val 
value: Long) extends Paimo
   override def name(): String = PaimonMetrics.PARTITION_SIZE
 }
 
+case class PaimonReadBatchTimeMetric() extends PaimonTimingSummaryMetric {
+  override def name(): String = PaimonMetrics.READ_BATCH_TIME
+  override def description0(): String = "read batch time"
+}
+
+case class PaimonReadBatchTimeTaskMetric(value: Long) extends PaimonTaskMetric 
{
+  override def name(): String = PaimonMetrics.READ_BATCH_TIME
+}
+
 case class PaimonPlanningDurationMetric() extends PaimonTimingSumMetric {
   override def name(): String = PaimonMetrics.PLANNING_DURATION
   override def description(): String = "planing duration"
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 2ca21a15e1..2040982687 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.disk.IOManager
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
+import org.apache.paimon.table.format.FormatDataSplit
 import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
 import org.apache.paimon.types.RowType
 
@@ -54,6 +55,7 @@ case class PaimonPartitionReader(
     val rowType = new RowType(dataFields)
     SparkInternalRow.create(rowType, blobAsDescriptor)
   }
+  private var totalReadBatchTimeMs: Long = 0L
 
   private lazy val read = readBuilder.newRead().withIOManager(ioManager)
 
@@ -89,6 +91,7 @@ case class PaimonPartitionReader(
         if (currentRow != null) {
           stop = true
         } else {
+          totalReadBatchTimeMs += currentRecordReader.readBatchTimeMs
           currentRecordReader.close()
           currentRecordReader = readSplit()
           if (currentRecordReader == null) {
@@ -101,7 +104,7 @@ case class PaimonPartitionReader(
 
   private def readSplit(): PaimonRecordReaderIterator = {
     if (splits.hasNext) {
-      val split = splits.next();
+      val split = splits.next()
       PaimonRecordReaderIterator(read.createReader(split), metadataColumns, 
split)
     } else {
       null
@@ -110,26 +113,28 @@ case class PaimonPartitionReader(
 
   // Partition metrics need to be computed only once.
   private lazy val partitionMetrics: Array[CustomTaskMetric] = {
-    val dataSplits = partition.splits.collect { case ds: DataSplit => ds }
-    val numSplits = dataSplits.length
-    if (dataSplits.nonEmpty) {
-      val splitSize = 
dataSplits.map(_.dataFiles().asScala.map(_.fileSize).sum).sum
-      Array(
-        PaimonNumSplitsTaskMetric(numSplits),
-        PaimonPartitionSizeTaskMetric(splitSize)
-      )
-    } else {
-      Array.empty[CustomTaskMetric]
-    }
+    val numSplits = partition.splits.length
+    val splitSize = partition.splits.map {
+      case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+      case fs: FormatDataSplit =>
+        if (fs.length() == null) fs.fileSize() else fs.length().longValue()
+      case _ => 0
+    }.sum
+
+    Array(
+      PaimonNumSplitsTaskMetric(numSplits),
+      PaimonPartitionSizeTaskMetric(splitSize)
+    )
   }
 
   override def currentMetricsValues(): Array[CustomTaskMetric] = {
-    partitionMetrics
+    partitionMetrics ++ 
Array(PaimonReadBatchTimeTaskMetric(totalReadBatchTimeMs))
   }
 
   override def close(): Unit = {
     try {
       if (currentRecordReader != null) {
+        totalReadBatchTimeMs += currentRecordReader.readBatchTimeMs
         currentRecordReader.close()
       }
     } finally {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 6928894e6b..16a05287ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.CloseableIterator
 import org.apache.spark.sql.PaimonUtils
 
 import java.io.IOException
+import java.util.concurrent.TimeUnit.NANOSECONDS
 
 case class PaimonRecordReaderIterator(
     reader: RecordReader[PaimonInternalRow],
@@ -56,6 +57,7 @@ case class PaimonRecordReaderIterator(
   private var currentIterator: RecordReader.RecordIterator[PaimonInternalRow] 
= readBatch()
   private var advanced = false
   private var currentResult: PaimonInternalRow = _
+  private var readBatchTimeNs: Long = 0L
 
   override def hasNext: Boolean = {
     if (currentIterator == null) {
@@ -88,6 +90,8 @@ case class PaimonRecordReaderIterator(
   }
 
   private def readBatch(): RecordReader.RecordIterator[PaimonInternalRow] = {
+    val startTimeNs = System.nanoTime()
+
     val iter = reader.readBatch()
     iter match {
       case fileRecordIterator: FileRecordIterator[_] =>
@@ -102,9 +106,14 @@ case class PaimonRecordReaderIterator(
               "Only append table or deletion vector table support querying 
metadata columns.")
         }
     }
+    readBatchTimeNs += System.nanoTime() - startTimeNs
     iter
   }
 
+  def readBatchTimeMs: Long = {
+    NANOSECONDS.toMillis(readBatchTimeNs)
+  }
+
   private def advanceIfNeeded(): Unit = {
     if (!advanced) {
       advanced = true
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index f47081e5b2..f5998ff73b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -61,6 +61,7 @@ case class PaimonSplitScan(
     Array(
       PaimonNumSplitMetric(),
       PaimonPartitionSizeMetric(),
+      PaimonReadBatchTimeMetric(),
       PaimonResultedTableFilesMetric()
     )
   }

Reply via email to