Repository: spark
Updated Branches:
  refs/heads/master 900917541 -> 6600786dd


[SPARK-11361][STREAMING] Show scopes of RDD operations inside 
DStream.foreachRDD and DStream.transform in DAG viz

Currently, when a DStream sets the scope for RDD generated by it, that scope is 
not allowed to be overridden by the RDD operations. So in case of 
`DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same 
scope - `foreachRDD  <time>`, as set by the `ForeachDStream`. So it is hard to 
debug generated RDDs in the RDD DAG viz in the Spark UI.

This patch allows the RDD operations inside `DStream.transform` and 
`DStream.foreachRDD` to append their own scopes to the earlier DStream scope.

I have also slightly tweaked how callsites are set such that the short callsite 
reflects the RDD operation name and line number. This tweak is necessary as 
callsites are not managed through scopes (which support nesting and overriding) 
and I didnt want to add another local property to control nesting and 
overriding of callsites.

## Before:
![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png)

## After:
![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png)

The code that was used to generate this is:
```
    val lines = ssc.socketTextStream(args(0), args(1).toInt, 
StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { rdd =>
      val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _)
      val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _)
      val count = temp2.count
      println(count)
    }
```

Note
- The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is 
visible
- The short callsites of stages refers to the line number of the RDD ops rather 
than the same line number of foreachRDD in all three cases.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #9315 from tdas/SPARK-11361.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6600786d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6600786d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6600786d

Branch: refs/heads/master
Commit: 6600786dddc89cb16779ee56b9173f63a3af3f27
Parents: 9009175
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Nov 10 16:54:06 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Nov 10 16:54:06 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  9 +--
 .../spark/streaming/TestOutputStream.scala      |  2 +-
 .../spark/streaming/dstream/DStream.scala       | 63 +++++++++++++---
 .../streaming/dstream/ForEachDStream.scala      | 14 +++-
 .../streaming/dstream/TransformedDStream.scala  | 13 ++++
 .../spark/streaming/DStreamScopeSuite.scala     | 75 ++++++++++++++++----
 .../apache/spark/streaming/TestSuiteBase.scala  |  4 +-
 7 files changed, 147 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7421821..67270c3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * has overridden the call site using `setCallSite()`, this will return the 
user's version.
    */
   private[spark] def getCallSite(): CallSite = {
-    Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
-      val longCallSite = 
Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
-      CallSite(shortCallSite, longCallSite)
-    }.getOrElse(Utils.getCallSite())
+    val callSite = Utils.getCallSite()
+    CallSite(
+      
Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),
+      Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm)
+    )
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 1a90000..79077e4 100644
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ 
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -37,7 +37,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1da0b0a..1a6edf9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -341,7 +341,7 @@ abstract class DStream[T: ClassTag] (
       // of RDD generation, else generate nothing.
       if (isTimeValid(time)) {
 
-        val rddOption = createRDDWithLocalProperties(time) {
+        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps 
= false) {
           // Disable checks for existing output directories in jobs launched 
by the streaming
           // scheduler, since we may need to write output to an existing 
directory during checkpoint
           // recovery; see SPARK-4835 for more details. We need to have this 
call here because
@@ -373,27 +373,52 @@ abstract class DStream[T: ClassTag] (
   /**
    * Wrap a body of code such that the call site and operation scope
    * information are passed to the RDDs created in this body properly.
-   */
-  protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+   * @param body RDD creation code to execute with certain local properties.
+   * @param time Current batch time that should be embedded in the scope names
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of 
the inner RDDs generated
+   *                           by `body` will be displayed in the UI; only the 
scope and callsite
+   *                           of the DStream operation that generated `this` 
will be displayed.
+   */
+  protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
     val scopeKey = SparkContext.RDD_SCOPE_KEY
     val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
     // Pass this DStream's operation scope and creation site information to 
RDDs through
     // thread-local properties in our SparkContext. Since this method may be 
called from another
     // DStream, we need to temporarily store any old scope and creation site 
information to
     // restore them later after setting our own.
-    val prevCallSite = ssc.sparkContext.getCallSite()
+    val prevCallSite = CallSite(
+      ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
+      ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
+    )
     val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
     val prevScopeNoOverride = 
ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
 
     try {
-      ssc.sparkContext.setCallSite(creationSite)
+      if (displayInnerRDDOps) {
+        // Unset the short form call site, so that generated RDDs get their own
+        ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
+        ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
+      } else {
+        // Set the callsite, so that the generated RDDs get the DStream's call 
site and
+        // the internal RDD call sites do not get displayed
+        ssc.sparkContext.setCallSite(creationSite)
+      }
+
       // Use the DStream's base scope for this RDD so we can (1) preserve the 
higher level
       // DStream operation name, and (2) share this scope with other DStreams 
created in the
       // same operation. Disallow nesting so that low-level Spark primitives 
do not show up.
       // TODO: merge callsites with scopes so we can just reuse the code there
       makeScope(time).foreach { s =>
         ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
-        ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        if (displayInnerRDDOps) {
+          // Allow inner RDDs to add inner scopes
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
+        } else {
+          // Do not allow inner RDDs to override the scope set by DStream
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        }
       }
 
       body
@@ -628,7 +653,7 @@ abstract class DStream[T: ClassTag] (
    */
   def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     val cleanedF = context.sparkContext.clean(foreachFunc, false)
-    this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
+    foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
   }
 
   /**
@@ -639,7 +664,23 @@ abstract class DStream[T: ClassTag] (
     // because the DStream is reachable from the outer object here, and because
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to 
SparkContext.clean
-    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
false)).register()
+    foreachRDD(foreachFunc, displayInnerRDDOps = true)
+  }
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, 
so
+   * 'this' DStream will be registered as an output stream and therefore 
materialized.
+   * @param foreachFunc foreachRDD function
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of 
the RDDs generated
+   *                           in the `foreachFunc` to be displayed in the UI. 
If `false`, then
+   *                           only the scopes and callsites of `foreachRDD` 
will override those
+   *                           of the RDDs on the display.
+   */
+  private def foreachRDD(
+      foreachFunc: (RDD[T], Time) => Unit,
+      displayInnerRDDOps: Boolean): Unit = {
+    new ForEachDStream(this,
+      context.sparkContext.clean(foreachFunc, false), 
displayInnerRDDOps).register()
   }
 
   /**
@@ -730,7 +771,7 @@ abstract class DStream[T: ClassTag] (
         // scalastyle:on println
       }
     }
-    new ForEachDStream(this, 
context.sparkContext.clean(foreachFunc)).register()
+    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = 
false)
   }
 
   /**
@@ -900,7 +941,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsObjectFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**
@@ -913,7 +954,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsTextFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index c109cec..4410a99 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -22,10 +22,19 @@ import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 
+/**
+ * An internal DStream used to represent output operations like 
DStream.foreachRDD.
+ * @param parent        Parent DStream
+ * @param foreachFunc   Function to apply on each RDD generated by the parent 
DStream
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the 
RDDs generated
+ *                           by `foreachFunc` will be displayed in the UI; 
only the scope and
+ *                           callsite of `DStream.foreachRDD` will be 
displayed.
+ */
 private[streaming]
 class ForEachDStream[T: ClassTag] (
     parent: DStream[T],
-    foreachFunc: (RDD[T], Time) => Unit
+    foreachFunc: (RDD[T], Time) => Unit,
+    displayInnerRDDOps: Boolean
   ) extends DStream[Unit](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
@@ -37,8 +46,7 @@ class ForEachDStream[T: ClassTag] (
   override def generateJob(time: Time): Option[Job] = {
     parent.getOrCompute(time) match {
       case Some(rdd) =>
-        val jobFunc = () => createRDDWithLocalProperties(time) {
-          ssc.sparkContext.setCallSite(creationSite)
+        val jobFunc = () => createRDDWithLocalProperties(time, 
displayInnerRDDOps) {
           foreachFunc(rdd, time)
         }
         Some(new Job(time, jobFunc))

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5eabdf6..080bc87 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -51,4 +51,17 @@ class TransformedDStream[U: ClassTag] (
     }
     Some(transformedRDD)
   }
+
+  /**
+   * Wrap a body of code such that the call site and operation scope
+   * information are passed to the RDDs created in this body properly.
+   * This has been overriden to make sure that `displayInnerRDDOps` is always 
`true`, that is,
+   * the inner scopes and callsites of RDDs generated in `DStream.transform` 
are always
+   * displayed in the UI.
+   */
+  override protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
+    super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d..bc223e6 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.streaming
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 
 /**
  * Tests whether scope information is passed from DStream operations to RDDs 
correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAnd
   private val batchDuration: Duration = Seconds(1)
 
   override def beforeAll(): Unit = {
-    ssc = new StreamingContext(new SparkContext("local", "test"), 
batchDuration)
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+    conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+    ssc = new StreamingContext(new SparkContext(conf), batchDuration)
   }
 
   override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAnd
 
   test("scoping nested operations") {
     val inputStream = new DummyInputDStream(ssc)
+    // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only 
countByKeyAndWindow
+    // should appear in scope
     val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
     countStream.initialize(Time(0))
 
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAnd
     testStream(countStream)
   }
 
+  test("transform should allow RDD operations to be captured in scopes") {
+    val inputStream = new DummyInputDStream(ssc)
+    val transformedStream = inputStream.transform { _.map { _ -> 
1}.reduceByKey(_ + _) }
+    transformedStream.initialize(Time(0))
+
+    val transformScopeBase = 
transformedStream.baseScope.map(RDDOperationScope.fromJson)
+    val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+    val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+    val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+    // Assert that all children RDDs inherit the DStream operation name 
correctly
+    assertDefined(transformScopeBase, transformScope1, transformScope2, 
transformScope3)
+    assert(transformScopeBase.get.name === "transform")
+    assertNestedScopeCorrect(transformScope1.get, 1000)
+    assertNestedScopeCorrect(transformScope2.get, 2000)
+    assertNestedScopeCorrect(transformScope3.get, 3000)
+
+    def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: 
Long): Unit = {
+      assert(rddScope.name === "reduceByKey")
+      assert(rddScope.parent.isDefined)
+      assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, 
batchTime)
+    }
+  }
+
+  test("foreachRDD should allow RDD operations to be captured in scope") {
+    val inputStream = new DummyInputDStream(ssc)
+    val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+    inputStream.foreachRDD { rdd =>
+      generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+    }
+    val batchCounter = new BatchCounter(ssc)
+    ssc.start()
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    clock.advance(3000)
+    batchCounter.waitUntilBatchesCompleted(3, 10000)
+    assert(generatedRDDs.size === 3)
+
+    val foreachBaseScope =
+      
ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+    assertDefined(foreachBaseScope)
+    assert(foreachBaseScope.get.name === "foreachRDD")
+
+    val rddScopes = generatedRDDs.map { _.scope }
+    assertDefined(rddScopes: _*)
+    rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+      assert(rddScope.get.name === "reduceByKey")
+      assert(rddScope.get.parent.isDefined)
+      assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 
1) * 1000)
+    }
+  }
+
   /** Assert that the RDD operation scope properties are not set in our 
SparkContext. */
   private def assertPropertiesNotSet(): Unit = {
     assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAnd
       baseScope: RDDOperationScope,
       rddScope: RDDOperationScope,
       batchTime: Long): Unit = {
-    assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
-  }
-
-  /** Assert that the given RDD scope inherits the base name and ID correctly. 
*/
-  private def assertScopeCorrect(
-      baseScopeId: String,
-      baseScopeName: String,
-      rddScope: RDDOperationScope,
-      batchTime: Long): Unit = {
+    val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
     val formattedBatchTime = UIUtils.formatBatchTime(
       batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
     assert(rddScope.id === s"${baseScopeId}_$batchTime")
     assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ 
$formattedBatchTime")
+    assert(rddScope.parent.isEmpty)  // There should not be any higher scope
   }
 
   /** Assert that all the specified options are defined. */

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b..a45c92d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
   ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to