Repository: spark
Updated Branches:
  refs/heads/branch-1.1 1d468df33 -> 8f8e2a4ee


[SPARK-3170][CORE][BUG]:RDD info loss in "StorageTab" and "ExecutorTab"

compeleted stage only need to remove its own partitions that are no longer 
cached. However, "StorageTab" may lost some rdds which are cached actually. Not 
only in "StorageTab", "ExectutorTab" may also lose some rdd info which have 
been overwritten by last rdd in a same task.
1. "StorageTab": when multiple stages run simultaneously, completed stage will 
remove rdd info which belong to other stages that are still running.
2. "ExectutorTab": taskcontext may lose some "updatedBlocks" info of  rdds  in 
a dependency chain. Like the following example:
         val r1 = sc.paralize(..).cache()
         val r2 = r1.map(...).cache()
         val n = r2.count()

When count the r2, r1 and r2 will be cached finally. So in 
CacheManager.getOrCompute, the taskcontext should contain "updatedBlocks" of r1 
and r2. Currently, the "updatedBlocks" only contain the info of r2.

Author: uncleGen <husty...@gmail.com>

Closes #2131 from uncleGen/master_ui_fix and squashes the following commits:

a6a8a0b [uncleGen] fix some coding style
3a1bc15 [uncleGen] fix some error in unit test
56ea488 [uncleGen] there's some line too long
c82ba82 [uncleGen] Bug Fix: RDD info loss in "StorageTab" and "ExecutorTab"

(cherry picked from commit d8298c46b7bf566d1cd2f7ea9b1b2b2722dcfb17)
Signed-off-by: Andrew Or <andrewo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f8e2a4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f8e2a4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f8e2a4e

Branch: refs/heads/branch-1.1
Commit: 8f8e2a4ee7419a96196727704695f5114da5b84e
Parents: 1d468df
Author: uncleGen <husty...@gmail.com>
Authored: Wed Aug 27 10:32:13 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Wed Aug 27 10:33:13 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  4 ++-
 .../apache/spark/ui/storage/StorageTab.scala    |  7 +++--
 .../org/apache/spark/CacheManagerSuite.scala    | 19 ++++++++++++++
 .../spark/ui/storage/StorageTabSuite.scala      | 27 ++++++++++++++++++++
 4 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f8e2a4e/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 5ddda4d..f8584b9 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) 
extends Logging {
           // Otherwise, cache the values and keep track of any updates in 
block statuses
           val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
           val cachedValues = putInBlockManager(key, computedValues, 
storageLevel, updatedBlocks)
-          context.taskMetrics.updatedBlocks = Some(updatedBlocks)
+          val metrics = context.taskMetrics
+          val lastUpdatedBlocks = 
metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ 
updatedBlocks.toSeq)
           new InterruptibleIterator(context, cachedValues)
 
         } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8e2a4e/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala 
b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 67f72a9..76097f1 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -70,8 +70,11 @@ class StorageListener(storageStatusListener: 
StorageStatusListener) extends Spar
   }
 
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = 
synchronized {
-    // Remove all partitions that are no longer cached
-    _rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
+    // Remove all partitions that are no longer cached in current completed 
stage
+    val completedRddIds = stageCompleted.stageInfo.rddInfos.map(r => 
r.id).toSet
+    _rddInfoMap.retain { case (id, info) =>
+      !completedRddIds.contains(id) || info.numCachedPartitions > 0
+    }
   }
 
   override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = 
synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8e2a4e/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 9c5f394..90dcadc 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -32,6 +32,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
   var split: Partition = _
   /** An RDD which returns the values [1, 2, 3, 4]. */
   var rdd: RDD[Int] = _
+  var rdd2: RDD[Int] = _
+  var rdd3: RDD[Int] = _
 
   before {
     sc = new SparkContext("local", "test")
@@ -43,6 +45,16 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
       override val getDependencies = List[Dependency[_]]()
       override def compute(split: Partition, context: TaskContext) = Array(1, 
2, 3, 4).iterator
     }
+    rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
+      override def getPartitions: Array[Partition] = 
firstParent[Int].partitions
+      override def compute(split: Partition, context: TaskContext) =
+        firstParent[Int].iterator(split, context)
+    }.cache()
+    rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
+      override def getPartitions: Array[Partition] = 
firstParent[Int].partitions
+      override def compute(split: Partition, context: TaskContext) =
+        firstParent[Int].iterator(split, context)
+    }.cache()
   }
 
   after {
@@ -87,4 +99,11 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter 
with EasyMockSugar
       assert(value.toList === List(1, 2, 3, 4))
     }
   }
+
+  test("verify task metrics updated correctly") {
+    cacheManager = sc.env.cacheManager
+    val context = new TaskContext(0, 0, 0)
+    cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
+    assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8e2a4e/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index b860177..a537c72 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -34,6 +34,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
   private val memOnly = StorageLevel.MEMORY_ONLY
   private val none = StorageLevel.NONE
   private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", 
TaskLocality.ANY, false)
+  private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", 
TaskLocality.ANY, false)
   private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
   private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
   private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
@@ -162,4 +163,30 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter 
{
     assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
   }
 
+  test("verify StorageTab contains all cached rdds") {
+
+    val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly)
+    val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly)
+    val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
+    val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
+    val taskMetrics0 = new TaskMetrics
+    val taskMetrics1 = new TaskMetrics
+    val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
+    val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
+    taskMetrics0.updatedBlocks = Some(Seq(block0))
+    taskMetrics1.updatedBlocks = Some(Seq(block1))
+    bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+    bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+    assert(storageListener.rddInfoList.size === 0)
+    bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, 
taskMetrics0))
+    assert(storageListener.rddInfoList.size === 1)
+    bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+    assert(storageListener.rddInfoList.size === 1)
+    bus.postToAll(SparkListenerStageCompleted(stageInfo0))
+    assert(storageListener.rddInfoList.size === 1)
+    bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, 
taskMetrics1))
+    assert(storageListener.rddInfoList.size === 2)
+    bus.postToAll(SparkListenerStageCompleted(stageInfo1))
+    assert(storageListener.rddInfoList.size === 2)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to