This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 11f0e7c797ed feat(spark): add input records & bytes metrics (#18882)
11f0e7c797ed is described below
commit 11f0e7c797ed3740be20c047892e39ce53627bd8
Author: fhan <[email protected]>
AuthorDate: Tue Jun 2 01:47:54 2026 +0800
feat(spark): add input records & bytes metrics (#18882)
Co-authored-by: fhan <[email protected]>
---
.../org/apache/hudi/HoodieMergeOnReadRDDV2.scala | 45 ++++++++++++++++------
.../spark/HoodieSparkInputMetricsUtils.scala | 40 +++++++++++++++++++
2 files changed, 74 insertions(+), 11 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 3d9b908c4d31..0104136f8424 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -42,7 +42,7 @@ import
org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
+import org.apache.spark.{HoodieSparkInputMetricsUtils, Partition,
SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -146,6 +146,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ val bytesReadCallback =
HoodieSparkInputMetricsUtils.getFSBytesReadOnThreadCallback()
val iter: Iterator[InternalRow] = partition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
@@ -203,15 +204,9 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
}
}
- if (iter.isInstanceOf[Closeable]) {
- // register a callback to close logScanner which will be executed on
task completion.
- // when tasks finished, this method will be called, and release
resources.
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
- }
-
val commitTimeMetadataFieldIdx =
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val needsFiltering = commitTimeMetadataFieldIdx >= 0 &&
includedInstantTimeSet.isDefined
- if (needsFiltering) {
+ val resultIter = if (needsFiltering) {
val filterT: Predicate[InternalRow] = new Predicate[InternalRow] {
override def test(row: InternalRow): Boolean = {
val commitTime = row.getString(commitTimeMetadataFieldIdx)
@@ -219,10 +214,11 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
}
}
iter.filter(filterT.test)
- }
- else {
+ } else {
iter
}
+
+ withInputMetrics(resultIter, iter, context, bytesReadCallback)
}
override protected def getPartitions: Array[Partition] =
@@ -259,6 +255,34 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
}
}
+ private def withInputMetrics(iter: Iterator[InternalRow],
+ closeableIter: Iterator[InternalRow],
+ context: TaskContext,
+ bytesReadCallback: () => Long):
Iterator[InternalRow] = {
+ val metricIter = new Iterator[InternalRow] with Closeable {
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): InternalRow = {
+ val row = iter.next()
+ HoodieSparkInputMetricsUtils.incRecordsRead(context, 1)
+ row
+ }
+
+ override def close(): Unit = {
+ closeableIter match {
+ case closeable: Closeable => closeable.close()
+ case _ =>
+ }
+ }
+ }
+
+ context.addTaskCompletionListener[Unit] { _ =>
+ HoodieSparkInputMetricsUtils.incBytesRead(context, bytesReadCallback())
+ metricIter.close()
+ }
+ metricIter
+ }
+
private def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath
= {
// Determine partition path as an immediate parent folder of either
// - The base file
@@ -273,4 +297,3 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
object HoodieMergeOnReadRDDV2 {
val CONFIG_INSTANTIATION_LOCK = new Object()
}
-
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
new file mode 100644
index 000000000000..4a3210bde126
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.deploy.SparkHadoopUtil
+
+object HoodieSparkInputMetricsUtils {
+
+ def getFSBytesReadOnThreadCallback(): () => Long = {
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ }
+
+ def incRecordsRead(taskContext: TaskContext, count: Long): Unit = {
+ if (taskContext != null) {
+ taskContext.taskMetrics().inputMetrics.incRecordsRead(count)
+ }
+ }
+
+ def incBytesRead(taskContext: TaskContext, bytes: Long): Unit = {
+ if (taskContext != null) {
+ taskContext.taskMetrics().inputMetrics.incBytesRead(bytes)
+ }
+ }
+}