[ https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bogdan Raducanu updated SPARK-21009: ------------------------------------ Description: The following code reproduces it: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} The problem comes from DAGScheduler.handleTaskCompletion. The SparkListenerTaskEnd event is sent before updateAccumulators is called, so it might not be up to date. The code there looks like it needs refactoring. was: The following code reproduces it: {code} test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } } {code} > SparkListenerTaskEnd.taskInfo.accumulables might not be accurate > ---------------------------------------------------------------- > > Key: SPARK-21009 > URL: https://issues.apache.org/jira/browse/SPARK-21009 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Bogdan Raducanu > > The following code reproduces it: > {code} > test("test") { > val foundMetrics = mutable.Set.empty[String] > spark.sparkContext.addSparkListener(new SparkListener { > override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { > taskEnd.taskInfo.accumulables.foreach { a => > if (a.name.isDefined) { > foundMetrics.add(a.name.get) > } > } > } > }) > for (iter <- 0 until 100) { > foundMetrics.clear() > println(s"iter = $iter") > spark.range(10).groupBy().agg("id" -> "sum").collect > spark.sparkContext.listenerBus.waitUntilEmpty(3000) > assert(foundMetrics.size > 0) > } > } > {code} > The problem comes from DAGScheduler.handleTaskCompletion. > The SparkListenerTaskEnd event is sent before updateAccumulators is called, > so it might not be up to date. > The code there looks like it needs refactoring. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org