Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r216406188 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -646,7 +647,47 @@ private[spark] class AppStatusListener( } override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - liveRDDs.remove(event.rddId) + liveRDDs.remove(event.rddId).foreach { liveRDD => + val executorsToUpdate = new HashSet[LiveExecutor]() + val storageLevel = liveRDD.info.storageLevel + val distributions = liveRDD.getDistributions() + + // Use RDD distribution to update executor memory and disk usage info. + distributions.foreach { case (executorId, rddDist) => + val maybeExec = liveExecutors.get(executorId) + + maybeExec.foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = math.max(0, exec.usedOffHeap - rddDist.offHeapUsed) --- End diff -- I'd take the `newValue` method inside `updateRDDBlock` and make it a proper private method (with a better name), since then it becomes clearer why this logic is needed.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org