Repository: spark
Updated Branches:
  refs/heads/master 5b62bef8c -> f3391ff2b


[SPARK-8889] [CORE] Fix for OOM for graph creation

Fix for OOM for graph creation

Author: Joshi <rekhajo...@gmail.com>
Author: Rekha Joshi <rekhajo...@gmail.com>

Closes #7602 from rekhajoshm/SPARK-8889.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3391ff2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3391ff2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3391ff2

Branch: refs/heads/master
Commit: f3391ff2b8b9c1f1308755dc223776692e3b7725
Parents: 5b62bef
Author: Joshi <rekhajo...@gmail.com>
Authored: Wed Aug 19 21:23:02 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Aug 19 21:23:02 2015 +0100

----------------------------------------------------------------------
 .../spark/ui/scope/RDDOperationGraph.scala      | 23 ++++++------
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 39 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3391ff2/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index ffea981..81f168a 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ui.scope
 
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{StringBuilder, ListBuffer}
 
 import org.apache.spark.Logging
 import org.apache.spark.scheduler.StageInfo
@@ -167,7 +167,7 @@ private[ui] object RDDOperationGraph extends Logging {
   def makeDotFile(graph: RDDOperationGraph): String = {
     val dotFile = new StringBuilder
     dotFile.append("digraph G {\n")
-    dotFile.append(makeDotSubgraph(graph.rootCluster, indent = "  "))
+    makeDotSubgraph(dotFile, graph.rootCluster, indent = "  ")
     graph.edges.foreach { edge => dotFile.append(s"""  
${edge.fromId}->${edge.toId};\n""") }
     dotFile.append("}")
     val result = dotFile.toString()
@@ -180,18 +180,19 @@ private[ui] object RDDOperationGraph extends Logging {
     s"""${node.id} [label="${node.name} [${node.id}]"]"""
   }
 
-  /** Return the dot representation of a subgraph in an RDDOperationGraph. */
-  private def makeDotSubgraph(cluster: RDDOperationCluster, indent: String): 
String = {
-    val subgraph = new StringBuilder
-    subgraph.append(indent + s"subgraph cluster${cluster.id} {\n")
-    subgraph.append(indent + s"""  label="${cluster.name}";\n""")
+  /** Update the dot representation of the RDDOperationGraph in cluster to 
subgraph. */
+  private def makeDotSubgraph(
+      subgraph: StringBuilder,
+      cluster: RDDOperationCluster,
+      indent: String): Unit = {
+    subgraph.append(indent).append(s"subgraph cluster${cluster.id} {\n")
+    subgraph.append(indent).append(s"""  label="${cluster.name}";\n""")
     cluster.childNodes.foreach { node =>
-      subgraph.append(indent + s"  ${makeDotNode(node)};\n")
+      subgraph.append(indent).append(s"  ${makeDotNode(node)};\n")
     }
     cluster.childClusters.foreach { cscope =>
-      subgraph.append(makeDotSubgraph(cscope, indent + "  "))
+      makeDotSubgraph(subgraph, cscope, indent + "  ")
     }
-    subgraph.append(indent + "}\n")
-    subgraph.toString()
+    subgraph.append(indent).append("}\n")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f3391ff2/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 3aa672f..69888b2 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui
 import java.net.{HttpURLConnection, URL}
 import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
 
+import scala.io.Source
 import scala.collection.JavaConversions._
 import scala.xml.Node
 
@@ -603,6 +604,44 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
     }
   }
 
+  test("job stages should have expected dotfile under DAG visualization") {
+    withSpark(newSparkContext()) { sc =>
+      // Create a multi-stage job
+      val rdd =
+        sc.parallelize(Seq(1, 2, 
3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+      rdd.count()
+
+      val stage0 = Source.fromURL(sc.ui.get.appUIAddress +
+        "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
+      assert(stage0.contains("digraph G {\n  subgraph clusterstage_0 {\n    " +
+        "label=&quot;Stage 0&quot;;\n    subgraph "))
+      assert(stage0.contains("{\n      label=&quot;parallelize&quot;;\n      " 
+
+        "0 [label=&quot;ParallelCollectionRDD [0]&quot;];\n    }"))
+      assert(stage0.contains("{\n      label=&quot;map&quot;;\n      " +
+        "1 [label=&quot;MapPartitionsRDD [1]&quot;];\n    }"))
+      assert(stage0.contains("{\n      label=&quot;groupBy&quot;;\n      " +
+        "2 [label=&quot;MapPartitionsRDD [2]&quot;];\n    }"))
+
+      val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
+        "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
+      assert(stage1.contains("digraph G {\n  subgraph clusterstage_1 {\n    " +
+        "label=&quot;Stage 1&quot;;\n    subgraph "))
+      assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
+        "3 [label=&quot;ShuffledRDD [3]&quot;];\n    }"))
+      assert(stage1.contains("{\n      label=&quot;map&quot;;\n      " +
+        "4 [label=&quot;MapPartitionsRDD [4]&quot;];\n    }"))
+      assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
+        "5 [label=&quot;MapPartitionsRDD [5]&quot;];\n    }"))
+
+      val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
+        "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
+      assert(stage2.contains("digraph G {\n  subgraph clusterstage_2 {\n    " +
+        "label=&quot;Stage 2&quot;;\n    subgraph "))
+      assert(stage2.contains("{\n      label=&quot;groupBy&quot;;\n      " +
+        "6 [label=&quot;ShuffledRDD [6]&quot;];\n    }"))
+    }
+  }
+
   def goToUi(sc: SparkContext, path: String): Unit = {
     goToUi(sc.ui.get, path)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to