Repository: spark
Updated Branches:
  refs/heads/master b3fef50e2 -> 729952a5e


[SPARK-1853] Show Streaming application code context (file, line number) in 
Spark Stages UI

This is a refactored version of the original PR 
https://github.com/apache/spark/pull/1723 my mubarak

Please take a look andrewor14, mubarak

Author: Mubarak Seyed <mubarak.se...@gmail.com>
Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #2464 from tdas/streaming-callsite and squashes the following commits:

dc54c71 [Tathagata Das] Made changes based on PR comments.
390b45d [Tathagata Das] Fixed minor bugs.
904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into streaming-callsite
7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it 
simpler. Also added unit test for DStream creation site.
b9ed945 [Mubarak Seyed] Adding streaming utils
c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
ceb43da [Mubarak Seyed] Changing default regex function name
8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
491a1eb [Mubarak Seyed] Removing streaming visibility from 
getRDDCreationCallSite in DStream
33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods
c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and 
changing method names in DStream
5051c58 [Mubarak Seyed] Getting return value of compute() into variable and 
call setCallSite(prevCallSite) only once. Adding return for other code paths 
(for None)
a207eb7 [Mubarak Seyed] Fixing code review comments
ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream
2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853
1d90cc3 [Mubarak Seyed] Changes for SPARK-1853
5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
70f494f [Mubarak Seyed] Changes for SPARK-1853
1500deb [Mubarak Seyed] Changes in Spark Streaming UI
9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context 
(file, line number) in Spark Stages UI
d466d75 [Mubarak Seyed] Changes for spark streaming UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/729952a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/729952a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/729952a5

Branch: refs/heads/master
Commit: 729952a5efce755387c76cdf29280ee6f49fdb72
Parents: b3fef50
Author: Mubarak Seyed <mubarak.se...@gmail.com>
Authored: Tue Sep 23 15:09:12 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Tue Sep 23 15:09:12 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 32 +++++--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  7 +-
 .../scala/org/apache/spark/util/Utils.scala     | 27 ++++--
 .../spark/streaming/StreamingContext.scala      |  4 +-
 .../spark/streaming/dstream/DStream.scala       | 96 +++++++++++++-------
 .../spark/streaming/StreamingContextSuite.scala | 45 ++++++++-
 6 files changed, 153 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 428f019..979d178 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Support function for API backtraces.
+   * Set the thread-local property for overriding the call sites
+   * of actions and RDDs.
    */
-  def setCallSite(site: String) {
-    setLocalProperty("externalCallSite", site)
+  def setCallSite(shortCallSite: String) {
+    setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
   }
 
   /**
-   * Support function for API backtraces.
+   * Set the thread-local property for overriding the call sites
+   * of actions and RDDs.
+   */
+  private[spark] def setCallSite(callSite: CallSite) {
+    setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
+    setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
+  }
+
+  /**
+   * Clear the thread-local property for overriding the call sites
+   * of actions and RDDs.
    */
   def clearCallSite() {
-    setLocalProperty("externalCallSite", null)
+    setLocalProperty(CallSite.SHORT_FORM, null)
+    setLocalProperty(CallSite.LONG_FORM, null)
   }
 
   /**
    * Capture the current user callsite and return a formatted version for 
printing. If the user
-   * has overridden the call site, this will return the user's version.
+   * has overridden the call site using `setCallSite()`, this will return the 
user's version.
    */
   private[spark] def getCallSite(): CallSite = {
-    Option(getLocalProperty("externalCallSite")) match {
-      case Some(callSite) => CallSite(callSite, longForm = "")
-      case None => Utils.getCallSite
-    }
+    Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
+      val longCallSite = 
Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
+      CallSite(shortCallSite, longCallSite)
+    }.getOrElse(Utils.getCallSite())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a9b905b..0e90caa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.util.Random
+import java.util.{Properties, Random}
 
 import scala.collection.{mutable, Map}
 import scala.collection.mutable.ArrayBuffer
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
SamplingUtils}
 
@@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
-  @transient private[spark] val creationSite = Utils.getCallSite
+  @transient private[spark] val creationSite = sc.getCallSite()
+
   private[spark] def getCreationSite: String = 
Option(creationSite).map(_.shortForm).getOrElse("")
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ed06384..2755887 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream,
 /** CallSite represents a place in user code. It can have a short and a long 
form. */
 private[spark] case class CallSite(shortForm: String, longForm: String)
 
+private[spark] object CallSite {
+  val SHORT_FORM = "callSite.short"
+  val LONG_FORM = "callSite.long"
+}
+
 /**
  * Various utility methods used by Spark.
  */
@@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  /**
-   * A regular expression to match classes of the "core" Spark API that we 
want to skip when
-   * finding the call site of a method.
-   */
-  private val SPARK_CLASS_REGEX = 
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+  /** Default filtering function for finding call sites using `getCallSite`. */
+  private def coreExclusionFunction(className: String): Boolean = {
+    // A regular expression to match classes of the "core" Spark API that we 
want to skip when
+    // finding the call site of a method.
+    val SPARK_CORE_CLASS_REGEX = 
"""^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+    val SCALA_CLASS_REGEX = """^scala""".r
+    val isSparkCoreClass = 
SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
+    val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
+    // If the class is a Spark internal class or a Scala class, then exclude.
+    isSparkCoreClass || isScalaClass
+  }
 
   /**
    * When called inside a class in the spark package, returns the name of the 
user code class
    * (outside the spark package) that called into Spark, as well as which 
Spark method they called.
    * This is used, for example, to tell users where in their code each RDD got 
created.
+   *
+   * @param skipClass Function that is used to exclude non-user-code classes.
    */
-  def getCallSite: CallSite = {
+  def getCallSite(skipClass: String => Boolean = coreExclusionFunction): 
CallSite = {
     val trace = Thread.currentThread.getStackTrace()
       .filterNot { ste:StackTraceElement =>
         // When running under some profilers, the current stack trace might 
contain some bogus
@@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {
 
     for (el <- trace) {
       if (insideSpark) {
-        if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
+        if (skipClass(el.getClassName)) {
           lastSparkMethod = if (el.getMethodName == "<init>") {
             // Spark method is a constructor; get its class name
             el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index f63560d..5a8eef1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -35,10 +35,9 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, 
ActorReceiver, Receiver}
+import org.apache.spark.streaming.receiver.{ActorReceiver, 
ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
-import org.apache.spark.util.MetadataCleaner
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -448,6 +447,7 @@ class StreamingContext private[streaming] (
       throw new SparkException("StreamingContext has already been stopped")
     }
     validate()
+    sparkContext.setCallSite(DStream.getCreationSite())
     scheduler.start()
     state = Started
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index e05db23..65f7ccd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -23,6 +23,7 @@ import java.io.{IOException, ObjectInputStream, 
ObjectOutputStream}
 import scala.deprecated
 import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
+import scala.util.matching.Regex
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.{BlockRDD, RDD}
@@ -30,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{CallSite, MetadataCleaner}
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, 
is a continuous
@@ -106,6 +107,9 @@ abstract class DStream[T: ClassTag] (
   /** Return the StreamingContext associated with this DStream */
   def context = ssc
 
+  /* Set the creation call site */
+  private[streaming] val creationSite = DStream.getCreationSite()
+
   /** Persist the RDDs of this DStream with the given storage level */
   def persist(level: StorageLevel): DStream[T] = {
     if (this.isInitialized) {
@@ -272,43 +276,41 @@ abstract class DStream[T: ClassTag] (
   }
 
   /**
-   * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is 
an internal
-   * method that should not be called directly.
+   * Get the RDD corresponding to the given time; either retrieve it from cache
+   * or compute-and-cache it.
    */
   private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
-    // If this DStream was not initialized (i.e., zeroTime not set), then do it
-    // If RDD was already generated, then retrieve it from HashMap
-    generatedRDDs.get(time) match {
-
-      // If an RDD was already generated and is being reused, then
-      // probably all RDDs in this DStream will be reused and hence should be 
cached
-      case Some(oldRDD) => Some(oldRDD)
-
-      // if RDD was not generated, and if the time is valid
-      // (based on sliding time of this DStream), then generate the RDD
-      case None => {
-        if (isTimeValid(time)) {
-          compute(time) match {
-            case Some(newRDD) =>
-              if (storageLevel != StorageLevel.NONE) {
-                newRDD.persist(storageLevel)
-                logInfo("Persisting RDD " + newRDD.id + " for time " +
-                  time + " to " + storageLevel + " at time " + time)
-              }
-              if (checkpointDuration != null &&
-                (time - zeroTime).isMultipleOf(checkpointDuration)) {
-                newRDD.checkpoint()
-                logInfo("Marking RDD " + newRDD.id + " for time " + time +
-                  " for checkpointing at time " + time)
-              }
-              generatedRDDs.put(time, newRDD)
-              Some(newRDD)
-            case None =>
-              None
+    // If RDD was already generated, then retrieve it from HashMap,
+    // or else compute the RDD
+    generatedRDDs.get(time).orElse {
+      // Compute the RDD if time is valid (e.g. correct time in a sliding 
window)
+      // of RDD generation, else generate nothing.
+      if (isTimeValid(time)) {
+        // Set the thread-local property for call sites to this DStream's 
creation site
+        // such that RDDs generated by compute gets that as their creation 
site.
+        // Note that this `getOrCompute` may get called from another DStream 
which may have
+        // set its own call site. So we store its call site in a temporary 
variable,
+        // set this DStream's creation site, generate RDDs and then restore 
the previous call site.
+        val prevCallSite = ssc.sparkContext.getCallSite()
+        ssc.sparkContext.setCallSite(creationSite)
+        val rddOption = compute(time)
+        ssc.sparkContext.setCallSite(prevCallSite)
+
+        rddOption.foreach { case newRDD =>
+          // Register the generated RDD for caching and checkpointing
+          if (storageLevel != StorageLevel.NONE) {
+            newRDD.persist(storageLevel)
+            logDebug(s"Persisting RDD ${newRDD.id} for time $time to 
$storageLevel")
           }
-        } else {
-          None
+          if (checkpointDuration != null && (time - 
zeroTime).isMultipleOf(checkpointDuration)) {
+            newRDD.checkpoint()
+            logInfo(s"Marking RDD ${newRDD.id} for time $time for 
checkpointing")
+          }
+          generatedRDDs.put(time, newRDD)
         }
+        rddOption
+      } else {
+        None
       }
     }
   }
@@ -799,3 +801,29 @@ abstract class DStream[T: ClassTag] (
     this
   }
 }
+
+private[streaming] object DStream {
+
+  /** Get the creation site of a DStream from the stack trace of when the 
DStream is created. */
+  def getCreationSite(): CallSite = {
+    val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
+    val SPARK_STREAMING_TESTCLASS_REGEX = 
"""^org\.apache\.spark\.streaming\.test""".r
+    val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
+    val SCALA_CLASS_REGEX = """^scala""".r
+
+    /** Filtering function that excludes non-user classes for a streaming 
application */
+    def streamingExclustionFunction(className: String): Boolean = {
+      def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
+      val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
+      val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
+      val isSparkStreamingTestClass = 
doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)
+      val isScalaClass = doesMatch(SCALA_CLASS_REGEX)
+
+      // If the class is a spark example class or a streaming test class then 
it is considered
+      // as a streaming application class and don't exclude. Otherwise, 
exclude any
+      // non-Spark and non-Scala class, as the rest would streaming 
application classes.
+      (isSparkClass || isScalaClass) && !isSparkExampleClass && 
!isSparkStreamingTestClass
+    }
+    org.apache.spark.util.Utils.getCallSite(streamingExclustionFunction)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/729952a5/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a3cabd6..ebf8374 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -19,13 +19,16 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.language.postfixOps
+
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.{MetadataCleaner, Utils}
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.Utils
+import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
 import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.SpanSugar._
 
@@ -257,6 +260,10 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     assert(exception.getMessage.contains("transform"), "Expected exception not 
thrown")
   }
 
+  test("DStream and generated RDD creation sites") {
+    testPackage.test()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => (1 to i))
     val inputStream = new TestInputStream(s, input, 1)
@@ -293,3 +300,37 @@ class TestReceiver extends 
Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
 object TestReceiver {
   val counter = new AtomicInteger(1)
 }
+
+/** Streaming application for testing DStream and RDD creation sites */
+package object testPackage extends Assertions {
+  def test() {
+    val conf = new SparkConf().setMaster("local").setAppName("CreationSite 
test")
+    val ssc = new StreamingContext(conf , Milliseconds(100))
+    try {
+      val inputStream = ssc.receiverStream(new TestReceiver)
+
+      // Verify creation site of DStream
+      val creationSite = inputStream.creationSite
+      assert(creationSite.shortForm.contains("receiverStream") &&
+        creationSite.shortForm.contains("StreamingContextSuite")
+      )
+      assert(creationSite.longForm.contains("testPackage"))
+
+      // Verify creation site of generated RDDs
+      var rddGenerated = false
+      var rddCreationSiteCorrect = true
+
+      inputStream.foreachRDD { rdd =>
+        rddCreationSiteCorrect = rdd.creationSite == creationSite
+        rddGenerated = true
+      }
+      ssc.start()
+
+      eventually(timeout(10000 millis), interval(10 millis)) {
+        assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was 
not correct")
+      }
+    } finally {
+      ssc.stop()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to