[ 
https://issues.apache.org/jira/browse/SPARK-21009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bogdan Raducanu updated SPARK-21009:
------------------------------------
    Description: 
The following code reproduces it:
{code}
  test("test") {
    val foundMetrics = mutable.Set.empty[String]
    spark.sparkContext.addSparkListener(new SparkListener {
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        taskEnd.taskInfo.accumulables.foreach { a =>
          if (a.name.isDefined) {
            foundMetrics.add(a.name.get)
          }
        }
      }
    })
    for (iter <- 0 until 100) {
      foundMetrics.clear()
      println(s"iter = $iter")
      spark.range(10).groupBy().agg("id" -> "sum").collect
      spark.sparkContext.listenerBus.waitUntilEmpty(3000)
      assert(foundMetrics.size > 0)
    }
  }
{code}

The problem comes from DAGScheduler.handleTaskCompletion.
The SparkListenerTaskEnd event is sent before updateAccumulators is called, so 
it might not be up to date.
The code there looks like it needs refactoring.

  was:
The following code reproduces it:
{code}
  test("test") {
    val foundMetrics = mutable.Set.empty[String]
    spark.sparkContext.addSparkListener(new SparkListener {
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        taskEnd.taskInfo.accumulables.foreach { a =>
          if (a.name.isDefined) {
            foundMetrics.add(a.name.get)
          }
        }
      }
    })
    for (iter <- 0 until 100) {
      foundMetrics.clear()
      println(s"iter = $iter")
      spark.range(10).groupBy().agg("id" -> "sum").collect
      spark.sparkContext.listenerBus.waitUntilEmpty(3000)
      assert(foundMetrics.size > 0)
    }
  }
{code}


> SparkListenerTaskEnd.taskInfo.accumulables might not be accurate
> ----------------------------------------------------------------
>
>                 Key: SPARK-21009
>                 URL: https://issues.apache.org/jira/browse/SPARK-21009
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Bogdan Raducanu
>
> The following code reproduces it:
> {code}
>   test("test") {
>     val foundMetrics = mutable.Set.empty[String]
>     spark.sparkContext.addSparkListener(new SparkListener {
>       override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
>         taskEnd.taskInfo.accumulables.foreach { a =>
>           if (a.name.isDefined) {
>             foundMetrics.add(a.name.get)
>           }
>         }
>       }
>     })
>     for (iter <- 0 until 100) {
>       foundMetrics.clear()
>       println(s"iter = $iter")
>       spark.range(10).groupBy().agg("id" -> "sum").collect
>       spark.sparkContext.listenerBus.waitUntilEmpty(3000)
>       assert(foundMetrics.size > 0)
>     }
>   }
> {code}
> The problem comes from DAGScheduler.handleTaskCompletion.
> The SparkListenerTaskEnd event is sent before updateAccumulators is called, 
> so it might not be up to date.
> The code there looks like it needs refactoring.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to