This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6207360 [SPARK-27012][CORE] Storage tab shows rdd details even after executor ended 6207360 is described below commit 6207360b00ed5e888ac460e6cf9de7cdd2478bff Author: Ajith <ajith2...@gmail.com> AuthorDate: Tue Mar 5 10:40:38 2019 -0800 [SPARK-27012][CORE] Storage tab shows rdd details even after executor ended ## What changes were proposed in this pull request? After we cache a table, we can see its details in Storage Tab of spark UI. If the executor has shutdown ( graceful shutdown/ Dynamic executor scenario) UI still shows the rdd as cached and when we click the link it throws error. This is because on executor remove event, we fail to adjust rdd partition details org.apache.spark.status.AppStatusListener#onExecutorRemoved ## How was this patch tested? Have tested this fix in UI manually Edit: Added UT Closes #23920 from ajithme/cachestorage. Authored-by: Ajith <ajith2...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../apache/spark/status/AppStatusListener.scala | 24 +++++ .../spark/status/AppStatusListenerSuite.scala | 100 +++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index a8b2153..fc64bef 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -211,6 +211,30 @@ private[spark] class AppStatusListener( update(rdd, now) } } + // Remove all RDD partitions that reference the removed executor + liveRDDs.values.foreach { rdd => + rdd.getPartitions.values + .filter(_.executors.contains(event.executorId)) + .foreach { partition => + if (partition.executors.length == 1) { + rdd.removePartition(partition.blockName) + rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, partition.memoryUsed * -1) + rdd.diskUsed = addDeltaToValue(rdd.diskUsed, partition.diskUsed * -1) + } else { + rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, + (partition.memoryUsed / partition.executors.length) * -1) + rdd.diskUsed = addDeltaToValue(rdd.diskUsed, + (partition.diskUsed / partition.executors.length) * -1) + partition.update(partition.executors + .filter(!_.equals(event.executorId)), rdd.storageLevel, + addDeltaToValue(partition.memoryUsed, + (partition.memoryUsed / partition.executors.length) * -1), + addDeltaToValue(partition.diskUsed, + (partition.diskUsed / partition.executors.length) * -1)) + } + } + update(rdd, now) + } if (isExecutorActiveForLiveStages(exec)) { // the executor was running for a currently active stage, so save it for now in // deadExecutors, and remove when there are no active stages overlapping with the diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 9f51bd7..b580066 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1520,6 +1520,106 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("storage information on executor lost/down") { + val listener = new AppStatusListener(store, conf, true) + val maxMemory = 42L + + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "1.example.com", 42) + val bm2 = BlockManagerId("2", "2.example.com", 84) + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map.empty, Map.empty))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory)) + } + + val rdd1b1 = RddBlock(1, 1, 1L, 2L) + val rdd1b2 = RddBlock(1, 2, 3L, 4L) + val level = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDDs are recorded. + val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + // Add partition 1 replicated on two block managers. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize))) + + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm2, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize))) + + // Add a second partition only to bm 1. + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo(bm1, rdd1b2.blockId, level, rdd1b2.memSize, rdd1b2.diskSize))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.numCachedPartitions === 2L) + assert(wrapper.info.memoryUsed === 2 * rdd1b1.memSize + rdd1b2.memSize) + assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === rdd1b1.memSize + rdd1b2.memSize) + assert(dist.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get + assert(part1.storageLevel === level.description) + assert(part1.memoryUsed === 2 * rdd1b1.memSize) + assert(part1.diskUsed === 2 * rdd1b1.diskSize) + assert(part1.executors === Seq(bm1.executorId, bm2.executorId)) + + val part2 = wrapper.info.partitions.get.find(_.blockName === rdd1b2.blockId.name).get + assert(part2.storageLevel === level.description) + assert(part2.memoryUsed === rdd1b2.memSize) + assert(part2.diskUsed === rdd1b2.diskSize) + assert(part2.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 2L) + assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize) + assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize) + } + + // Remove Executor 1. + listener.onExecutorRemoved(createExecutorRemovedEvent(1)) + + // check that partition info now contains only details about what is remaining in bm2 + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.numCachedPartitions === 1L) + assert(wrapper.info.memoryUsed === rdd1b1.memSize) + assert(wrapper.info.diskUsed === rdd1b1.diskSize) + assert(wrapper.info.dataDistribution.get.size === 1L) + assert(wrapper.info.partitions.get.size === 1L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get + assert(dist.memoryUsed === rdd1b1.memSize) + assert(dist.diskUsed === rdd1b1.diskSize) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === rdd1b1.memSize) + assert(part.diskUsed === rdd1b1.diskSize) + assert(part.executors === Seq(bm2.executorId)) + } + + // Remove Executor 2. + listener.onExecutorRemoved(createExecutorRemovedEvent(2)) + // Check that storage cost is zero as both exec are down + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.numCachedPartitions === 0) + assert(wrapper.info.memoryUsed === 0) + assert(wrapper.info.diskUsed === 0) + assert(wrapper.info.dataDistribution.isEmpty) + assert(wrapper.info.partitions.get.isEmpty) + } + } + + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org