Repository: spark Updated Branches: refs/heads/branch-1.6 c859be2dd -> 42d933fbb
[SPARK-11112] DAG visualization: display RDD callsite <img width="548" alt="screen shot 2015-11-01 at 9 42 33 am" src="https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png"> mateiz sarutak Author: Andrew Or <and...@databricks.com> Closes #9398 from andrewor14/rdd-callsite. (cherry picked from commit 7f741905b06ed6d3dfbff6db41a3355dab71aa3c) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d933fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d933fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d933fb Branch: refs/heads/branch-1.6 Commit: 42d933fbba0584b39bd8218eafc44fb03aeb157d Parents: c859be2 Author: Andrew Or <and...@databricks.com> Authored: Sat Nov 7 05:35:53 2015 +0100 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Nov 9 09:59:20 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/ui/static/spark-dag-viz.css | 4 +++ .../org/apache/spark/storage/RDDInfo.scala | 16 +++++++-- .../spark/ui/scope/RDDOperationGraph.scala | 10 +++--- .../org/apache/spark/util/JsonProtocol.scala | 17 ++++++++- .../scala/org/apache/spark/util/Utils.scala | 1 + .../org/apache/spark/ui/UISeleniumSuite.scala | 14 ++++---- .../apache/spark/util/JsonProtocolSuite.scala | 37 ++++++++++++++++---- 7 files changed, 79 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index 3b4ae2e..9cc5c79 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -122,3 +122,7 @@ stroke: #52C366; stroke-width: 2px; } + +.tooltip-inner { + white-space: pre-wrap; +} http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 9606262..3fa209b 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDDOperationScope, RDD} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CallSite, Utils} @DeveloperApi class RDDInfo( @@ -28,9 +28,20 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], + val callSite: CallSite, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { + def this( + id: Int, + name: String, + numPartitions: Int, + storageLevel: StorageLevel, + parentIds: Seq[Int], + scope: Option[RDDOperationScope] = None) { + this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) + } + var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L @@ -56,6 +67,7 @@ private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) val parentIds = rdd.dependencies.map(_.rdd.id) - new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) + new RDDInfo(rdd.id, rddName, rdd.partitions.length, + rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope) } } http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 81f168a..2427456 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer} import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.CallSite /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph( rootCluster: RDDOperationCluster) /** A node in an RDDOperationGraph. This represents an RDD. */ -private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean) +private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite) /** * A directed edge connecting two nodes in an RDDOperationGraph. @@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging { edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } // TODO: differentiate between the intention to cache an RDD and whether it's actually cached - val node = nodes.getOrElseUpdate( - rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE)) + val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode( + rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite)) if (rdd.scope.isEmpty) { // This RDD has no encompassing scope, so we put it directly in the root cluster @@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging { /** Return the dot representation of a node in an RDDOperationGraph. */ private def makeDotNode(node: RDDOperationNode): String = { - s"""${node.id} [label="${node.name} [${node.id}]"]""" + val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}" + s"""${node.id} [label="$label"]""" } /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */ http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ee2eb58..c9beeb2 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -398,6 +398,7 @@ private[spark] object JsonProtocol { ("RDD ID" -> rddInfo.id) ~ ("Name" -> rddInfo.name) ~ ("Scope" -> rddInfo.scope.map(_.toJson)) ~ + ("Callsite" -> callsiteToJson(rddInfo.callSite)) ~ ("Parent IDs" -> parentIds) ~ ("Storage Level" -> storageLevel) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ @@ -407,6 +408,11 @@ private[spark] object JsonProtocol { ("Disk Size" -> rddInfo.diskSize) } + def callsiteToJson(callsite: CallSite): JValue = { + ("Short Form" -> callsite.shortForm) ~ + ("Long Form" -> callsite.longForm) + } + def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ @@ -851,6 +857,9 @@ private[spark] object JsonProtocol { val scope = Utils.jsonOption(json \ "Scope") .map(_.extract[String]) .map(RDDOperationScope.fromJson) + val callsite = Utils.jsonOption(json \ "Callsite") + .map(callsiteFromJson) + .getOrElse(CallSite.empty) val parentIds = Utils.jsonOption(json \ "Parent IDs") .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) @@ -863,7 +872,7 @@ private[spark] object JsonProtocol { .getOrElse(json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope) + val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.externalBlockStoreSize = externalBlockStoreSize @@ -871,6 +880,12 @@ private[spark] object JsonProtocol { rddInfo } + def callsiteFromJson(json: JValue): CallSite = { + val shortForm = (json \ "Short Form").extract[String] + val longForm = (json \ "Long Form").extract[String] + CallSite(shortForm, longForm) + } + def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5a976ee..316c194 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, longForm: String) private[spark] object CallSite { val SHORT_FORM = "callSite.short" val LONG_FORM = "callSite.long" + val empty = CallSite("", "") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 18eec7d..ceecfd6 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) assert(stage0.contains("{\n label="parallelize";\n " + - "0 [label="ParallelCollectionRDD [0]"];\n }")) + "0 [label="ParallelCollectionRDD [0]")) assert(stage0.contains("{\n label="map";\n " + - "1 [label="MapPartitionsRDD [1]"];\n }")) + "1 [label="MapPartitionsRDD [1]")) assert(stage0.contains("{\n label="groupBy";\n " + - "2 [label="MapPartitionsRDD [2]"];\n }")) + "2 [label="MapPartitionsRDD [2]")) val stage1 = Source.fromURL(sc.ui.get.appUIAddress + "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) assert(stage1.contains("{\n label="groupBy";\n " + - "3 [label="ShuffledRDD [3]"];\n }")) + "3 [label="ShuffledRDD [3]")) assert(stage1.contains("{\n label="map";\n " + - "4 [label="MapPartitionsRDD [4]"];\n }")) + "4 [label="MapPartitionsRDD [4]")) assert(stage1.contains("{\n label="groupBy";\n " + - "5 [label="MapPartitionsRDD [5]"];\n }")) + "5 [label="MapPartitionsRDD [5]")) val stage2 = Source.fromURL(sc.ui.get.appUIAddress + "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) assert(stage2.contains("{\n label="groupBy";\n " + - "6 [label="ShuffledRDD [6]"];\n }")) + "6 [label="ShuffledRDD [6]")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 953456c..3f94ef7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite { test("Dependent Classes") { val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) + testCallsite(CallSite("happy", "birthday")) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics( @@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite { testBlockId(StreamBlockId(1, 2L)) } + /* ============================== * + | Backward compatibility tests | + * ============================== */ + test("ExceptionFailure backward compatibility") { val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None, None) @@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent)) } - test("RDDInfo backward compatibility (scope, parent IDs)") { - // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties - val rddInfo = new RDDInfo( - 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable"))) + test("RDDInfo backward compatibility (scope, parent IDs, callsite)") { + // "Scope" and "Parent IDs" were introduced in Spark 1.4.0 + // "Callsite" was introduced in Spark 1.6.0 + val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), + CallSite("short", "long"), Some(new RDDOperationScope("fable"))) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) .removeField({ _._1 == "Parent IDs"}) .removeField({ _._1 == "Scope"}) - val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None) + .removeField({ _._1 == "Callsite"}) + val expectedRddInfo = new RDDInfo( + 1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None) assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) } @@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(info, newInfo) } + private def testCallsite(callsite: CallSite): Unit = { + val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite)) + assert(callsite === newCallsite) + } + private def testStageInfo(info: StageInfo) { val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info)) assertEquals(info, newInfo) @@ -713,7 +726,8 @@ class JsonProtocolSuite extends SparkFunSuite { } private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { - val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7)) + val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, + Seq(1, 4, 7), CallSite(a.toString, b.toString)) r.numCachedPartitions = c r.memSize = d r.diskSize = e @@ -856,6 +870,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 101, | "Name": "mayor", + | "Callsite": {"Short Form": "101", "Long Form": "201"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1258,6 +1273,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 1, | "Name": "mayor", + | "Callsite": {"Short Form": "1", "Long Form": "200"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1301,6 +1317,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 2, | "Name": "mayor", + | "Callsite": {"Short Form": "2", "Long Form": "400"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1318,6 +1335,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Callsite": {"Short Form": "3", "Long Form": "401"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1361,6 +1379,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Callsite": {"Short Form": "3", "Long Form": "600"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1378,6 +1397,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Callsite": {"Short Form": "4", "Long Form": "601"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1395,6 +1415,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Callsite": {"Short Form": "5", "Long Form": "602"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1438,6 +1459,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Callsite": {"Short Form": "4", "Long Form": "800"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1455,6 +1477,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Callsite": {"Short Form": "5", "Long Form": "801"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1472,6 +1495,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 6, | "Name": "mayor", + | "Callsite": {"Short Form": "6", "Long Form": "802"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1489,6 +1513,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 7, | "Name": "mayor", + | "Callsite": {"Short Form": "7", "Long Form": "803"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org