spark git commit: [SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change.
Repository: spark Updated Branches: refs/heads/branch-2.0 f63ba2210 -> 69327667d [SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change. ## What changes were proposed in this pull request? 1. Making 'name' field of RDDInfo mutable. 2. In StorageListener: catching the fact that RDD's name was changed and updating it in RDDInfo. ## How was this patch tested? 1. Manual verification - the 'Storage' tab now behaves as expected. 2. The commit also contains a new unit test which verifies this. Author: LukaszCloses #13264 from lgieron/SPARK-9044. (cherry picked from commit b120fba6ae26186b3fa0dfbb1637046f4e76c2b0) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69327667 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69327667 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69327667 Branch: refs/heads/branch-2.0 Commit: 69327667d5a14b12de8055d752fbe3abb8d6671c Parents: f63ba22 Author: Lukasz Authored: Wed May 25 10:24:21 2016 -0700 Committer: Shixiong Zhu Committed: Wed May 25 10:24:28 2016 -0700 -- .../scala/org/apache/spark/storage/RDDInfo.scala | 2 +- .../org/apache/spark/ui/storage/StorageTab.scala | 2 +- .../apache/spark/ui/storage/StorageTabSuite.scala | 17 + 3 files changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/69327667/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 083d78b..e5abbf7 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -24,7 +24,7 @@ import org.apache.spark.util.Utils @DeveloperApi class RDDInfo( val id: Int, -val name: String, +var name: String, val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], http://git-wip-us.apache.org/repos/asf/spark/blob/69327667/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 5009583..c212362 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -59,7 +59,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { val rddInfos = stageSubmitted.stageInfo.rddInfos -rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) } +rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info).name = info.name } } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/69327667/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 7d77dee..411a0dd 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -179,6 +179,23 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.size === 2) } + test("verify StorageTab still contains a renamed RDD") { +val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4)) +val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details") +bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) +bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) +val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L)) +postUpdateBlocks(bus, blockUpdateInfos1) +assert(storageListener.rddInfoList.size == 1) + +val newName = "new_name" +val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4)) +val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details") +bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) +assert(storageListener.rddInfoList.size == 1) +
spark git commit: [SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change.
Repository: spark Updated Branches: refs/heads/master 4f27b8dd5 -> b120fba6a [SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change. ## What changes were proposed in this pull request? 1. Making 'name' field of RDDInfo mutable. 2. In StorageListener: catching the fact that RDD's name was changed and updating it in RDDInfo. ## How was this patch tested? 1. Manual verification - the 'Storage' tab now behaves as expected. 2. The commit also contains a new unit test which verifies this. Author: LukaszCloses #13264 from lgieron/SPARK-9044. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b120fba6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b120fba6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b120fba6 Branch: refs/heads/master Commit: b120fba6ae26186b3fa0dfbb1637046f4e76c2b0 Parents: 4f27b8d Author: Lukasz Authored: Wed May 25 10:24:21 2016 -0700 Committer: Shixiong Zhu Committed: Wed May 25 10:24:21 2016 -0700 -- .../scala/org/apache/spark/storage/RDDInfo.scala | 2 +- .../org/apache/spark/ui/storage/StorageTab.scala | 2 +- .../apache/spark/ui/storage/StorageTabSuite.scala | 17 + 3 files changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b120fba6/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 083d78b..e5abbf7 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -24,7 +24,7 @@ import org.apache.spark.util.Utils @DeveloperApi class RDDInfo( val id: Int, -val name: String, +var name: String, val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], http://git-wip-us.apache.org/repos/asf/spark/blob/b120fba6/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 5009583..c212362 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -59,7 +59,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { val rddInfos = stageSubmitted.stageInfo.rddInfos -rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) } +rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info).name = info.name } } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/b120fba6/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 7d77dee..411a0dd 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -179,6 +179,23 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.size === 2) } + test("verify StorageTab still contains a renamed RDD") { +val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4)) +val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details") +bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) +bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) +val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L)) +postUpdateBlocks(bus, blockUpdateInfos1) +assert(storageListener.rddInfoList.size == 1) + +val newName = "new_name" +val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4)) +val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details") +bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) +assert(storageListener.rddInfoList.size == 1) +assert(storageListener.rddInfoList.head.name == newName) + } + private def postUpdateBlocks( bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]):