Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22341#discussion_r216406188
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -646,7 +647,47 @@ private[spark] class AppStatusListener(
       }
     
       override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
    -    liveRDDs.remove(event.rddId)
    +    liveRDDs.remove(event.rddId).foreach { liveRDD =>
    +      val executorsToUpdate = new HashSet[LiveExecutor]()
    +      val storageLevel = liveRDD.info.storageLevel
    +      val distributions = liveRDD.getDistributions()
    +
    +      // Use RDD distribution to update executor memory and disk usage 
info.
    +      distributions.foreach { case (executorId, rddDist) =>
    +        val maybeExec = liveExecutors.get(executorId)
    +
    +        maybeExec.foreach { exec =>
    +          if (exec.hasMemoryInfo) {
    +            if (storageLevel.useOffHeap) {
    +              exec.usedOffHeap = math.max(0, exec.usedOffHeap - 
rddDist.offHeapUsed)
    --- End diff --
    
    I'd take the `newValue` method inside `updateRDDBlock` and make it a proper 
private method (with a better name), since then it becomes clearer why this 
logic is needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to