[ https://issues.apache.org/jira/browse/SPARK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488597#comment-17488597 ]
Apache Spark commented on SPARK-37585: -------------------------------------- User 'bozhang2820' has created a pull request for this issue: https://github.com/apache/spark/pull/35432 > DSV2 InputMetrics are not getting update in corner case > ------------------------------------------------------- > > Key: SPARK-37585 > URL: https://issues.apache.org/jira/browse/SPARK-37585 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.3, 3.1.2 > Reporter: Sandeep Katta > Priority: Major > > In some corner cases, DSV2 is not updating the input metrics. > > This is very special case where the number of records read are less than 1000 > and *hasNext* is not called for last element(cz input.hasNext returns false > so MetricsIterator.hasNext is not called) > > hasNext implementation of MetricsIterator > > {code:java} > override def hasNext: Boolean = { > if (iter.hasNext) { > true > } else { > metricsHandler.updateMetrics(0, force = true) > false > } {code} > > You reproduce this issue easily in spark-shell by running below code > {code:java} > import scala.collection.mutable > import org.apache.spark.scheduler.{SparkListener, > SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "") > val dir = "Users/tmp1" > spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir) > val df = spark.read.format("parquet").load(dir) > val bytesReads = new mutable.ArrayBuffer[Long]() > val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new > SparkListener() { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead > recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead > } > } > spark.sparkContext.addSparkListener(bytesReadListener) > try { > df.limit(10).collect() > assert(recordsRead.sum > 0) > assert(bytesReads.sum > 0) > } finally { > spark.sparkContext.removeSparkListener(bytesReadListener) > } {code} > This code generally fails at *assert(bytesReads.sum > 0)* which confirms that > updateMetrics API is not called > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org