Repository: spark
Updated Branches:
  refs/heads/branch-1.4 34edaa8ac -> 863ec0cb4


http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ee02fbd..3f162d1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -28,10 +28,11 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
+import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
-import org.apache.spark._
 
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides 
strong backwards-
@@ -228,6 +229,7 @@ private[spark] object JsonProtocol {
 
   def stageInfoToJson(stageInfo: StageInfo): JValue = {
     val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
+    val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
     val submissionTime = 
stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
     val completionTime = 
stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
     val failureReason = 
stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
@@ -236,6 +238,7 @@ private[spark] object JsonProtocol {
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~
+    ("Parent IDs" -> parentIds) ~
     ("Details" -> stageInfo.details) ~
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
@@ -368,8 +371,11 @@ private[spark] object JsonProtocol {
 
   def rddInfoToJson(rddInfo: RDDInfo): JValue = {
     val storageLevel = storageLevelToJson(rddInfo.storageLevel)
+    val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
     ("RDD ID" -> rddInfo.id) ~
     ("Name" -> rddInfo.name) ~
+    ("Scope" -> rddInfo.scope.map(_.toJson)) ~
+    ("Parent IDs" -> parentIds) ~
     ("Storage Level" -> storageLevel) ~
     ("Number of Partitions" -> rddInfo.numPartitions) ~
     ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
@@ -519,7 +525,7 @@ private[spark] object JsonProtocol {
     // The "Stage Infos" field was added in Spark 1.2.0
     val stageInfos = Utils.jsonOption(json \ "Stage Infos")
       .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
-        stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, 
"unknown"))
+        stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, 
Seq.empty, "unknown"))
       }
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
@@ -599,7 +605,10 @@ private[spark] object JsonProtocol {
     val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
-    val rddInfos = (json \ "RDD 
Info").extract[List[JValue]].map(rddInfoFromJson(_))
+    val rddInfos = (json \ "RDD 
Info").extract[List[JValue]].map(rddInfoFromJson)
+    val parentIds = Utils.jsonOption(json \ "Parent IDs")
+      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
+      .getOrElse(Seq.empty)
     val details = (json \ "Details").extractOpt[String].getOrElse("")
     val submissionTime = Utils.jsonOption(json \ "Submission 
Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion 
Time").map(_.extract[Long])
@@ -609,7 +618,8 @@ private[spark] object JsonProtocol {
       case None => Seq[AccumulableInfo]()
     }
 
-    val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, 
rddInfos, details)
+    val stageInfo = new StageInfo(
+      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
@@ -785,16 +795,22 @@ private[spark] object JsonProtocol {
   def rddInfoFromJson(json: JValue): RDDInfo = {
     val rddId = (json \ "RDD ID").extract[Int]
     val name = (json \ "Name").extract[String]
+    val scope = Utils.jsonOption(json \ "Scope")
+      .map(_.extract[String])
+      .map(RDDOperationScope.fromJson)
+    val parentIds = Utils.jsonOption(json \ "Parent IDs")
+      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
+      .getOrElse(Seq.empty)
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
     val numPartitions = (json \ "Number of Partitions").extract[Int]
     val numCachedPartitions = (json \ "Number of Cached 
Partitions").extract[Int]
     val memSize = (json \ "Memory Size").extract[Long]
-    // fallback to tachyon for backward compatability
+    // fallback to tachyon for backward compatibility
     val externalBlockStoreSize = (json \ "ExternalBlockStore Size").toSome
       .getOrElse(json \ "Tachyon Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
 
-    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel)
+    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, 
parentIds, scope)
     rddInfo.numCachedPartitions = numCachedPartitions
     rddInfo.memSize = memSize
     rddInfo.externalBlockStoreSize = externalBlockStoreSize

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 49e6de4..84f787e 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -727,7 +727,7 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private val executorIdleTimeout = 3L
 
   private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
-    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
+    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no 
details")
   }
 
   private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): 
TaskInfo = {

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
new file mode 100644
index 0000000..d75ecbf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{TaskContext, Partition, SparkContext}
+
+/**
+ *
+ */
+class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
+  private var sc: SparkContext = null
+  private val scope1 = new RDDOperationScope("scope1")
+  private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
+  private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
+
+  before {
+    sc = new SparkContext("local", "test")
+  }
+
+  after {
+    sc.stop()
+  }
+
+  test("getAllScopes") {
+    assert(scope1.getAllScopes === Seq(scope1))
+    assert(scope2.getAllScopes === Seq(scope1, scope2))
+    assert(scope3.getAllScopes === Seq(scope1, scope2, scope3))
+  }
+
+  test("json de/serialization") {
+    val scope1Json = scope1.toJson
+    val scope2Json = scope2.toJson
+    val scope3Json = scope3.toJson
+    assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
+    assert(scope2Json === 
s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
+    assert(scope3Json === 
s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
+    assert(RDDOperationScope.fromJson(scope1Json) === scope1)
+    assert(RDDOperationScope.fromJson(scope2Json) === scope2)
+    assert(RDDOperationScope.fromJson(scope3Json) === scope3)
+  }
+
+  test("withScope") {
+    val rdd0: MyCoolRDD = new MyCoolRDD(sc)
+    var rdd1: MyCoolRDD = null
+    var rdd2: MyCoolRDD = null
+    var rdd3: MyCoolRDD = null
+    RDDOperationScope.withScope(sc, "scope1", allowNesting = false) {
+      rdd1 = new MyCoolRDD(sc)
+      RDDOperationScope.withScope(sc, "scope2", allowNesting = false) {
+        rdd2 = new MyCoolRDD(sc)
+        RDDOperationScope.withScope(sc, "scope3", allowNesting = false) {
+          rdd3 = new MyCoolRDD(sc)
+        }
+      }
+    }
+    assert(rdd0.scope.isEmpty)
+    assert(rdd1.scope.isDefined)
+    assert(rdd2.scope.isDefined)
+    assert(rdd3.scope.isDefined)
+    assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
+    assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
+    assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
+  }
+
+  test("withScope with partial nesting") {
+    val rdd0: MyCoolRDD = new MyCoolRDD(sc)
+    var rdd1: MyCoolRDD = null
+    var rdd2: MyCoolRDD = null
+    var rdd3: MyCoolRDD = null
+    RDDOperationScope.withScope(sc, "scope1", allowNesting = true) { // allow 
nesting here
+      rdd1 = new MyCoolRDD(sc)
+      RDDOperationScope.withScope(sc, "scope2", allowNesting = false) { // 
stop nesting here
+        rdd2 = new MyCoolRDD(sc)
+        RDDOperationScope.withScope(sc, "scope3", allowNesting = false) {
+          rdd3 = new MyCoolRDD(sc)
+        }
+      }
+    }
+    assert(rdd0.scope.isEmpty)
+    assert(rdd1.scope.isDefined)
+    assert(rdd2.scope.isDefined)
+    assert(rdd3.scope.isDefined)
+    assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
+    assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
+    assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
+  }
+
+  test("withScope with multiple layers of nesting") {
+    val rdd0: MyCoolRDD = new MyCoolRDD(sc)
+    var rdd1: MyCoolRDD = null
+    var rdd2: MyCoolRDD = null
+    var rdd3: MyCoolRDD = null
+    RDDOperationScope.withScope(sc, "scope1", allowNesting = true) {
+      rdd1 = new MyCoolRDD(sc)
+      RDDOperationScope.withScope(sc, "scope2", allowNesting = true) {
+        rdd2 = new MyCoolRDD(sc)
+        RDDOperationScope.withScope(sc, "scope3", allowNesting = true) {
+          rdd3 = new MyCoolRDD(sc)
+        }
+      }
+    }
+    assert(rdd0.scope.isEmpty)
+    assert(rdd1.scope.isDefined)
+    assert(rdd2.scope.isDefined)
+    assert(rdd3.scope.isDefined)
+    assert(rdd1.scope.get.getAllScopes.map(_.name) === Seq("scope1"))
+    assert(rdd2.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2"))
+    assert(rdd3.scope.get.getAllScopes.map(_.name) === Seq("scope1", "scope2", 
"scope3"))
+  }
+
+}
+
+private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
+  override def getPartitions: Array[Partition] = Array.empty
+  override def compute(p: Partition, context: TaskContext): Iterator[Int] = { 
Nil.toIterator }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index ee1071c..17193dd 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -287,8 +287,8 @@ class StorageSuite extends FunSuite {
 
   // For testing StorageUtils.updateRddInfo
   private def stockRDDInfos: Seq[RDDInfo] = {
-    val info0 = new RDDInfo(0, "0", 10, memAndDisk)
-    val info1 = new RDDInfo(1, "1", 3, memAndDisk)
+    val info0 = new RDDInfo(0, "0", 10, memAndDisk, Seq(3))
+    val info1 = new RDDInfo(1, "1", 3, memAndDisk, Seq(4))
     Seq(info0, info1)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 21d8267..967dd08 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
   val jobCompletionTime = 1421191296660L
 
   private def createStageStartEvent(stageId: Int) = {
-    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, 
"")
     SparkListenerStageSubmitted(stageInfo)
   }
 
   private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
-    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, 
"")
     if (failed) {
       stageInfo.failureReason = Some("Failed!")
     }
@@ -51,7 +51,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       stageIds: Seq[Int],
       jobGroup: Option[String] = None): SparkListenerJobStart = {
     val stageInfos = stageIds.map { stageId =>
-      new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+      new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
     }
     val properties: Option[Properties] = jobGroup.map { groupId =>
       val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 0ea9ea4..7b38e6d 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -35,10 +35,10 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
   private val none = StorageLevel.NONE
   private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", 
TaskLocality.ANY, false)
   private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", 
TaskLocality.ANY, false)
-  private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
-  private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
-  private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
-  private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
+  private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, Seq(10))
+  private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, Seq(10))
+  private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, Seq(10))
+  private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, Seq(10))
   private val bm1 = BlockManagerId("big", "dog", 1)
 
   before {
@@ -54,7 +54,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     assert(storageListener.rddInfoList.isEmpty)
 
     // 2 RDDs are known, but none are cached
-    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), 
"details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), 
Seq.empty, "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.isEmpty)
@@ -64,15 +64,16 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo3Cached = rddInfo3
     rddInfo2Cached.numCachedPartitions = 1
     rddInfo3Cached.numCachedPartitions = 1
-    val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, 
rddInfo3Cached), "details")
+    val stageInfo1 = new StageInfo(
+      1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, 
"details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
 
     // Submitting RDDInfos with duplicate IDs does nothing
-    val rddInfo0Cached = new RDDInfo(0, "freedom", 100, 
StorageLevel.MEMORY_ONLY)
+    val rddInfo0Cached = new RDDInfo(0, "freedom", 100, 
StorageLevel.MEMORY_ONLY, Seq(10))
     rddInfo0Cached.numCachedPartitions = 1
-    val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), 
"details")
+    val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), 
Seq.empty, "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
@@ -88,7 +89,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo1Cached = rddInfo1
     rddInfo0Cached.numCachedPartitions = 1
     rddInfo1Cached.numCachedPartitions = 1
-    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, 
rddInfo1Cached), "details")
+    val stageInfo0 = new StageInfo(
+      0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, 
"details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.size === 2)
@@ -108,7 +110,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
     val stageInfo0 = new StageInfo(
-      0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+      0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, 
"details")
     bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 3)
@@ -166,10 +168,10 @@ class StorageTabSuite extends FunSuite with 
BeforeAndAfter {
 
   test("verify StorageTab contains all cached rdds") {
 
-    val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly)
-    val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly)
-    val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
-    val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
+    val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, Seq(4))
+    val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, Seq(4))
+    val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), 
Seq.empty, "details")
+    val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), 
Seq.empty, "details")
     val taskMetrics0 = new TaskMetrics
     val taskMetrics1 = new TaskMetrics
     val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0c9cf5b..0c5221d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 
@@ -162,7 +163,7 @@ class JsonProtocolSuite extends FunSuite {
     assertEquals(exceptionFailure, 
JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
-  test("StageInfo backward compatibility") {
+  test("StageInfo backward compatibility (details, accumulables)") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
     val newJson = JsonProtocol.stageInfoToJson(info)
 
@@ -297,7 +298,7 @@ class JsonProtocolSuite extends FunSuite {
     val stageIds = Seq[Int](1, 2, 3, 4)
     val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 
400, x * 500))
     val dummyStageInfos =
-      stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, 
"unknown"))
+      stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, 
Seq.empty, "unknown"))
     val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, 
properties)
     val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == 
"Stage Infos"})
     val expectedJobStart =
@@ -323,6 +324,25 @@ class JsonProtocolSuite extends FunSuite {
     assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
   }
 
+  test("RDDInfo backward compatibility (scope, parent IDs)") {
+    // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" 
properties
+    val rddInfo = new RDDInfo(
+      1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new 
RDDOperationScope("fable")))
+    val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
+      .removeField({ _._1 == "Parent IDs"})
+      .removeField({ _._1 == "Scope"})
+    val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, 
Seq.empty, scope = None)
+    assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
+  }
+
+  test("StageInfo backward compatibility (parent IDs)") {
+    // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
+    val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 
3), "details")
+    val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ 
_._1 == "Parent IDs"})
+    val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, 
Seq.empty, "details")
+    assertEquals(expectedStageInfo, 
JsonProtocol.stageInfoFromJson(oldStageInfo))
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -645,7 +665,7 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
-    val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK)
+    val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 
7))
     r.numCachedPartitions = c
     r.memSize = d
     r.diskSize = e
@@ -654,7 +674,7 @@ class JsonProtocolSuite extends FunSuite {
 
   private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, 
d + i, e + i) }
-    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
+    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 
200, 300), "details")
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
@@ -747,6 +767,7 @@ class JsonProtocolSuite extends FunSuite {
       |    "Stage Name": "greetings",
       |    "Number of Tasks": 200,
       |    "RDD Info": [],
+      |    "ParentIDs" : [100, 200, 300],
       |    "Details": "details",
       |    "Accumulables": [
       |      {
@@ -785,6 +806,7 @@ class JsonProtocolSuite extends FunSuite {
       |      {
       |        "RDD ID": 101,
       |        "Name": "mayor",
+      |        "Parent IDs": [1, 4, 7],
       |        "Storage Level": {
       |          "Use Disk": true,
       |          "Use Memory": true,
@@ -799,6 +821,7 @@ class JsonProtocolSuite extends FunSuite {
       |        "Disk Size": 501
       |      }
       |    ],
+      |    "ParentIDs" : [100, 200, 300],
       |    "Details": "details",
       |    "Accumulables": [
       |      {
@@ -1168,6 +1191,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 1,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1182,6 +1206,7 @@ class JsonProtocolSuite extends FunSuite {
       |          "Disk Size": 500
       |        }
       |      ],
+      |      "Parent IDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1207,6 +1232,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 2,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1223,6 +1249,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 3,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1237,6 +1264,7 @@ class JsonProtocolSuite extends FunSuite {
       |          "Disk Size": 1001
       |        }
       |      ],
+      |      "ParentIDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1262,6 +1290,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 3,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1278,6 +1307,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 4,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1294,6 +1324,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 5,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1308,6 +1339,7 @@ class JsonProtocolSuite extends FunSuite {
       |          "Disk Size": 1502
       |        }
       |      ],
+      |      "ParentIDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1333,6 +1365,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 4,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1349,6 +1382,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 5,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1365,6 +1399,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 6,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1381,6 +1416,7 @@ class JsonProtocolSuite extends FunSuite {
       |        {
       |          "RDD ID": 7,
       |          "Name": "mayor",
+      |          "Parent IDs": [1, 4, 7],
       |          "Storage Level": {
       |            "Use Disk": true,
       |            "Use Memory": true,
@@ -1395,6 +1431,7 @@ class JsonProtocolSuite extends FunSuite {
       |          "Disk Size": 2003
       |        }
       |      ],
+      |      "ParentIDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to