Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22341#discussion_r216257678 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -646,7 +647,47 @@ private[spark] class AppStatusListener( } override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { - liveRDDs.remove(event.rddId) + liveRDDs.remove(event.rddId).foreach { liveRDD => + val executorsToUpdate = new HashSet[LiveExecutor]() + val storageLevel = liveRDD.info.storageLevel + val distributions = liveRDD.getDistributions() + + // Use RDD distribution to update executor memory and disk usage info. + distributions.foreach { case (executorId, rddDist) => + val maybeExec = liveExecutors.get(executorId) + + maybeExec.foreach { exec => + if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { + exec.usedOffHeap = math.max(0, exec.usedOffHeap - rddDist.offHeapUsed) + } else { + exec.usedOnHeap = math.max(0, exec.usedOnHeap - rddDist.onHeapUsed) + } + } + exec.memoryUsed = math.max(0, exec.memoryUsed - rddDist.memoryUsed) + exec.diskUsed = math.max(0, exec.diskUsed - rddDist.diskUsed) + executorsToUpdate += exec + } + } + + // Use RDD partition info to update executor block info. + val partitions = liveRDD.getPartitions() + + partitions.foreach { case (_, part) => + val executors = part.executors --- End diff -- No, a partition can exist on more than one executor.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org