[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041055#comment-16041055 ]
Bogdan Raducanu commented on SPARK-21009: ----------------------------------------- Yes, looks like duplicate. I posted the repro code in that one. I'll close this one. > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > ---------------------------------------------------------------- > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org