This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 247bebc  [SPARK-28561][WEBUI] DAG viz for barrier-execution mode
247bebc is described below

commit 247bebcf94df77883ac245aea63e7e871fc7aa44
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Mon Aug 12 22:38:10 2019 -0700

    [SPARK-28561][WEBUI] DAG viz for barrier-execution mode
    
    ## What changes were proposed in this pull request?
    
    In the current UI, we cannot identify which RDDs are barrier. Visualizing 
it will make easy to debug.
    Following images are shown after this change.
    
    ![Screenshot from 2019-07-30 
16-30-35](https://user-images.githubusercontent.com/4736016/62110508-83cec100-b2e9-11e9-83b9-bc2e485a4cbe.png)
    ![Screenshot from 2019-07-30 
16-31-09](https://user-images.githubusercontent.com/4736016/62110509-83cec100-b2e9-11e9-9e2e-47c4dae23a52.png)
    
    The boxes in pale green mean barrier (We might need to discuss which color 
is proper).
    
    ## How was this patch tested?
    
    Tested manually.
    The images above are shown by following operations.
    
    ```
    val rdd1 = sc.parallelize(1 to 10)
    val rdd2 = sc.parallelize(1 to 10)
    val rdd3 = rdd1.zip(rdd2).barrier.mapPartitions(identity(_))
    val rdd4 = rdd3.map(identity(_))
    val rdd5 = rdd4.reduceByKey(_+_)
    rdd5.collect
    ```
    
    Closes #25296 from sarutak/barrierexec-dagviz.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../org/apache/spark/ui/static/spark-dag-viz.css   | 12 +++++++++
 .../org/apache/spark/ui/static/spark-dag-viz.js    |  6 +++++
 .../scala/org/apache/spark/status/storeTypes.scala |  4 ++-
 .../scala/org/apache/spark/storage/RDDInfo.scala   |  3 ++-
 .../main/scala/org/apache/spark/ui/UIUtils.scala   |  3 +++
 .../apache/spark/ui/scope/RDDOperationGraph.scala  | 30 +++++++++++++++++-----
 .../scala/org/apache/spark/util/JsonProtocol.scala |  4 ++-
 .../spark/status/AppStatusListenerSuite.scala      |  6 ++---
 .../org/apache/spark/storage/StorageSuite.scala    |  4 +--
 .../spark/ui/scope/RDDOperationGraphSuite.scala    | 10 ++++----
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  7 ++---
 11 files changed, 67 insertions(+), 22 deletions(-)

diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
index 9cc5c79..1fbc90b 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
@@ -89,6 +89,12 @@
   stroke-width: 2px;
 }
 
+#dag-viz-graph svg.job g.cluster.barrier rect {
+  fill: #B4E9E2;
+  stroke: #32DBC6;
+  stroke-width: 2px;
+}
+
 /* Stage page specific styles */
 
 #dag-viz-graph svg.stage g.cluster rect {
@@ -123,6 +129,12 @@
   stroke-width: 2px;
 }
 
+#dag-viz-graph svg.stage g.cluster.barrier rect {
+  fill: #84E9E2;
+  stroke: #32DBC6;
+  stroke-width: 2px;
+}
+
 .tooltip-inner {
   white-space: pre-wrap;
 }
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index cf508ac..035d72f 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -172,6 +172,12 @@ function renderDagViz(forJob) {
     svg.selectAll("g." + nodeId).classed("cached", true);
   });
 
+  metadataContainer().selectAll(".barrier-rdd").each(function() {
+    var rddId = d3.select(this).text().trim()
+    var clusterId = VizConstants.clusterPrefix + rddId
+    svg.selectAll("g." + clusterId).classed("barrier", true)
+  });
+
   resizeSvg(svg);
   interpretLineBreak(svg);
 }
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index eea47b3..9da5bea 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -402,7 +402,9 @@ private[spark] class RDDOperationClusterWrapper(
     val childClusters: Seq[RDDOperationClusterWrapper]) {
 
   def toRDDOperationCluster(): RDDOperationCluster = {
-    val cluster = new RDDOperationCluster(id, name)
+    val isBarrier = childNodes.exists(_.barrier)
+    val name = if (isBarrier) this.name + "\n(barrier mode)" else this.name
+    val cluster = new RDDOperationCluster(id, isBarrier, name)
     childNodes.foreach(cluster.attachChildNode)
     childClusters.foreach { child =>
       cluster.attachChildCluster(child.toRDDOperationCluster())
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala 
b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 917cfab..27a4d4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -29,6 +29,7 @@ class RDDInfo(
     var name: String,
     val numPartitions: Int,
     var storageLevel: StorageLevel,
+    val isBarrier: Boolean,
     val parentIds: Seq[Int],
     val callSite: String = "",
     val scope: Option[RDDOperationScope] = None)
@@ -68,6 +69,6 @@ private[spark] object RDDInfo {
       rdd.creationSite.shortForm
     }
     new RDDInfo(rdd.id, rddName, rdd.partitions.length,
-      rdd.getStorageLevel, parentIds, callSite, rdd.scope)
+      rdd.getStorageLevel, rdd.isBarrier(), parentIds, callSite, rdd.scope)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 11d3831..70e24bd 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -425,6 +425,9 @@ private[spark] object UIUtils extends Logging {
               {
                 g.rootCluster.getCachedNodes.map { n =>
                   <div class="cached-rdd">{n.id}</div>
+                } ++
+                g.rootCluster.getBarrierClusters.map { c =>
+                  <div class="barrier-rdd">{c.id}</div>
                 }
               }
             </div>
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 540c1c4..9ace324 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -42,7 +42,12 @@ private[spark] case class RDDOperationGraph(
     rootCluster: RDDOperationCluster)
 
 /** A node in an RDDOperationGraph. This represents an RDD. */
-private[spark] case class RDDOperationNode(id: Int, name: String, cached: 
Boolean, callsite: String)
+private[spark] case class RDDOperationNode(
+    id: Int,
+    name: String,
+    cached: Boolean,
+    barrier: Boolean,
+    callsite: String)
 
 /**
  * A directed edge connecting two nodes in an RDDOperationGraph.
@@ -56,7 +61,10 @@ private[spark] case class RDDOperationEdge(fromId: Int, 
toId: Int)
  * This represents any grouping of RDDs, including operation scopes (e.g. 
textFile, flatMap),
  * stages, jobs, or any higher level construct. A cluster may be nested inside 
of other clusters.
  */
-private[spark] class RDDOperationCluster(val id: String, private var _name: 
String) {
+private[spark] class RDDOperationCluster(
+    val id: String,
+    val barrier: Boolean,
+    private var _name: String) {
   private val _childNodes = new ListBuffer[RDDOperationNode]
   private val _childClusters = new ListBuffer[RDDOperationCluster]
 
@@ -75,6 +83,10 @@ private[spark] class RDDOperationCluster(val id: String, 
private var _name: Stri
     _childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes)
   }
 
+  def getBarrierClusters: Seq[RDDOperationCluster] = {
+    _childClusters.filter(_.barrier) ++ 
_childClusters.flatMap(_.getBarrierClusters)
+  }
+
   def canEqual(other: Any): Boolean = other.isInstanceOf[RDDOperationCluster]
 
   override def equals(other: Any): Boolean = other match {
@@ -117,7 +129,7 @@ private[spark] object RDDOperationGraph extends Logging {
     val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
     val stageClusterName = s"Stage ${stage.stageId}" +
       { if (stage.attemptNumber == 0) "" else s" (attempt 
${stage.attemptNumber})" }
-    val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+    val rootCluster = new RDDOperationCluster(stageClusterId, false, 
stageClusterName)
 
     var rootNodeCount = 0
     val addRDDIds = new mutable.HashSet[Int]()
@@ -143,7 +155,7 @@ private[spark] object RDDOperationGraph extends Logging {
 
       // TODO: differentiate between the intention to cache an RDD and whether 
it's actually cached
       val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
-        rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
+        rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, 
rdd.isBarrier, rdd.callSite))
       if (rdd.scope.isEmpty) {
         // This RDD has no encompassing scope, so we put it directly in the 
root cluster
         // This should happen only if an RDD is instantiated outside of a 
public RDD API
@@ -157,7 +169,8 @@ private[spark] object RDDOperationGraph extends Logging {
         val rddClusters = rddScopes.map { scope =>
           val clusterId = scope.id
           val clusterName = scope.name.replaceAll("\\n", "\\\\n")
-          clusters.getOrElseUpdate(clusterId, new 
RDDOperationCluster(clusterId, clusterName))
+          clusters.getOrElseUpdate(
+            clusterId, new RDDOperationCluster(clusterId, false, clusterName))
         }
         // Build the cluster hierarchy for this RDD
         rddClusters.sliding(2).foreach { pc =>
@@ -227,7 +240,12 @@ private[spark] object RDDOperationGraph extends Logging {
     } else {
       ""
     }
-    val label = s"${node.name} [${node.id}]$isCached\n${node.callsite}"
+    val isBarrier = if (node.barrier) {
+      " [Barrier]"
+    } else {
+      ""
+    }
+    val label = s"${node.name} 
[${node.id}]$isCached$isBarrier\n${node.callsite}"
     s"""${node.id} [label="${StringEscapeUtils.escapeJava(label)}"]"""
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b8ca4ee..6b06975 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1049,12 +1049,14 @@ private[spark] object JsonProtocol {
       .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
       .getOrElse(Seq.empty)
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
+    val isBarrier = jsonOption(json \ 
"Barrier").map(_.extract[Boolean]).getOrElse(false)
     val numPartitions = (json \ "Number of Partitions").extract[Int]
     val numCachedPartitions = (json \ "Number of Cached 
Partitions").extract[Int]
     val memSize = (json \ "Memory Size").extract[Long]
     val diskSize = (json \ "Disk Size").extract[Long]
 
-    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, 
parentIds, callsite, scope)
+    val rddInfo =
+      new RDDInfo(rddId, name, numPartitions, storageLevel, isBarrier, 
parentIds, callsite, scope)
     rddInfo.numCachedPartitions = numCachedPartitions
     rddInfo.memSize = memSize
     rddInfo.diskSize = diskSize
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 7d73546..4b71a48 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -698,8 +698,8 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val level = StorageLevel.MEMORY_AND_DISK
 
     // Submit a stage and make sure the RDDs are recorded.
-    val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
-    val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, Nil)
+    val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
+    val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil)
     val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, 
"details1")
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
 
@@ -1543,7 +1543,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     val level = StorageLevel.MEMORY_AND_DISK
 
     // Submit a stage and make sure the RDDs are recorded.
-    val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
+    val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
     val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, 
"details1")
     listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new 
Properties()))
 
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index ca35238..5f2abb4 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -123,8 +123,8 @@ class StorageSuite extends SparkFunSuite {
 
   // For testing StorageUtils.updateRddInfo
   private def stockRDDInfos: Seq[RDDInfo] = {
-    val info0 = new RDDInfo(0, "0", 10, memAndDisk, Seq(3))
-    val info1 = new RDDInfo(1, "1", 3, memAndDisk, Seq(4))
+    val info0 = new RDDInfo(0, "0", 10, memAndDisk, false, Seq(3))
+    val info1 = new RDDInfo(1, "1", 3, memAndDisk, false, Seq(4))
     Seq(info0, info1)
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala
index 6ddcb5a..e335451 100644
--- a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala
@@ -22,14 +22,14 @@ import org.apache.spark.SparkFunSuite
 class RDDOperationGraphSuite extends SparkFunSuite {
   test("Test simple cluster equals") {
     // create a 2-cluster chain with a child
-    val c1 = new RDDOperationCluster("1", "Bender")
-    val c2 = new RDDOperationCluster("2", "Hal")
+    val c1 = new RDDOperationCluster("1", false, "Bender")
+    val c2 = new RDDOperationCluster("2", false, "Hal")
     c1.attachChildCluster(c2)
-    c1.attachChildNode(new RDDOperationNode(3, "Marvin", false, "collect!"))
+    c1.attachChildNode(new RDDOperationNode(3, "Marvin", false, false, 
"collect!"))
 
     // create an equal cluster, but without the child node
-    val c1copy = new RDDOperationCluster("1", "Bender")
-    val c2copy = new RDDOperationCluster("2", "Hal")
+    val c1copy = new RDDOperationCluster("1", false, "Bender")
+    val c2copy = new RDDOperationCluster("2", false, "Hal")
     c1copy.attachChildCluster(c2copy)
 
     assert(c1 == c1copy)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a093fa6..bbf64be 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -366,14 +366,14 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
     // "Scope" and "Parent IDs" were introduced in Spark 1.4.0
     // "Callsite" was introduced in Spark 1.6.0
-    val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
+    val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, false, Seq(1, 
6, 8),
       "callsite", Some(new RDDOperationScope("fable")))
     val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
       .removeField({ _._1 == "Parent IDs"})
       .removeField({ _._1 == "Scope"})
       .removeField({ _._1 == "Callsite"})
     val expectedRddInfo = new RDDInfo(
-      1, "one", 100, StorageLevel.NONE, Seq.empty, "", scope = None)
+      1, "one", 100, StorageLevel.NONE, false, Seq.empty, "", scope = None)
     assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
   }
 
@@ -857,7 +857,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
   }
 
   private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
-    val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 
7), a.toString)
+    val r =
+      new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, false, Seq(1, 
4, 7), a.toString)
     r.numCachedPartitions = c
     r.memSize = d
     r.diskSize = e


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to