[ 
https://issues.apache.org/jira/browse/SPARK-37585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17488597#comment-17488597
 ] 

Apache Spark commented on SPARK-37585:
--------------------------------------

User 'bozhang2820' has created a pull request for this issue:
https://github.com/apache/spark/pull/35432

> DSV2 InputMetrics are not getting update in corner case
> -------------------------------------------------------
>
>                 Key: SPARK-37585
>                 URL: https://issues.apache.org/jira/browse/SPARK-37585
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.3, 3.1.2
>            Reporter: Sandeep Katta
>            Priority: Major
>
> In some corner cases, DSV2 is not updating the input metrics.
>  
> This is very special case where the number of records read are less than 1000 
> and *hasNext* is not called for last element(cz input.hasNext returns false 
> so MetricsIterator.hasNext is not called)
>  
> hasNext implementation of MetricsIterator
>  
> {code:java}
> override def hasNext: Boolean = {
>   if (iter.hasNext) {
>     true
>   } else {
>     metricsHandler.updateMetrics(0, force = true)
>     false
>   } {code}
>  
> You reproduce this issue easily in spark-shell by running below code
> {code:java}
> import scala.collection.mutable
> import org.apache.spark.scheduler.{SparkListener, 
> SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "")
> val dir = "Users/tmp1"
> spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir)
> val df = spark.read.format("parquet").load(dir)
> val bytesReads = new mutable.ArrayBuffer[Long]()
> val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new 
> SparkListener() {
>   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
>     bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
>     recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead
>   }
> }
> spark.sparkContext.addSparkListener(bytesReadListener)
> try {
> df.limit(10).collect()
> assert(recordsRead.sum > 0)
> assert(bytesReads.sum > 0)
> } finally {
> spark.sparkContext.removeSparkListener(bytesReadListener)
> } {code}
> This code generally fails at *assert(bytesReads.sum > 0)* which confirms that 
> updateMetrics API is not called
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to