[ 
https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041055#comment-16041055
 ] 

Bogdan Raducanu commented on SPARK-21009:
-----------------------------------------

Yes, looks like duplicate. I posted the repro code in that one. I'll close this 
one.

> SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
> ----------------------------------------------------------------
>
>                 Key: SPARK-21009
>                 URL: https://issues.apache.org/jira/browse/SPARK-21009
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Bogdan Raducanu
>
> The following code reproduces it:
> {code}
>   test("test") {
>     val foundMetrics = mutable.Set.empty[String]
>     spark.sparkContext.addSparkListener(new SparkListener {
>       override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
>         taskEnd.taskInfo.accumulables.foreach { a =>
>           if (a.name.isDefined) {
>             foundMetrics.add(a.name.get)
>           }
>         }
>       }
>     })
>     for (iter <- 0 until 100) {
>       foundMetrics.clear()
>       println(s"iter = $iter")
>       spark.range(10).groupBy().agg("id" -> "sum").collect
>       spark.sparkContext.listenerBus.waitUntilEmpty(3000)
>       assert(foundMetrics.size > 0)
>     }
>   }
> {code}
> The problem comes from DAGScheduler.handleTaskCompletion.
> The SparkListenerTaskEnd event is sent before updateAccumulators is called, 
> so it might not be up to date.
> The code there looks like it needs refactoring.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to