http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 31c07c7..7f7c7ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,7 +25,7 @@ import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.TextOutputFormat
 
@@ -277,12 +277,20 @@ abstract class RDD[T: ClassTag](
     if (isCheckpointed) firstParent[T].iterator(split, context) else 
compute(split, context)
   }
 
+  /**
+   * Execute a block of code in a scope such that all new RDDs created in this 
body will
+   * be part of the same scope. For more detail, see 
{{org.apache.spark.rdd.RDDOperationScope}}.
+   *
+   * Note: Return statements are NOT allowed in the given body.
+   */
+  private[spark] def withScope[U](body: => U): U = 
RDDOperationScope.withScope[U](sc)(body)
+
   // Transformations (return a new RDD)
 
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[U: ClassTag](f: T => U): RDD[U] = {
+  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
   }
@@ -291,7 +299,7 @@ abstract class RDD[T: ClassTag](
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
+  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[U, T](this, (context, pid, iter) => 
iter.flatMap(cleanF))
   }
@@ -299,7 +307,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
-  def filter(f: T => Boolean): RDD[T] = {
+  def filter(f: T => Boolean): RDD[T] = withScope {
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[T, T](
       this,
@@ -310,13 +318,16 @@ abstract class RDD[T: ClassTag](
   /**
    * Return a new RDD containing the distinct elements in this RDD.
    */
-  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
+  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
withScope {
     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+  }
 
   /**
    * Return a new RDD containing the distinct elements in this RDD.
    */
-  def distinct(): RDD[T] = distinct(partitions.length)
+  def distinct(): RDD[T] = withScope {
+    distinct(partitions.length)
+  }
 
   /**
    * Return a new RDD that has exactly numPartitions partitions.
@@ -327,7 +338,7 @@ abstract class RDD[T: ClassTag](
    * If you are decreasing the number of partitions in this RDD, consider 
using `coalesce`,
    * which can avoid performing a shuffle.
    */
-  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = {
+  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
     coalesce(numPartitions, shuffle = true)
   }
 
@@ -352,7 +363,7 @@ abstract class RDD[T: ClassTag](
    * data distributed using a hash partitioner.
    */
   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: 
Ordering[T] = null)
-      : RDD[T] = {
+      : RDD[T] = withScope {
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from 
a random partition. */
       val distributePartition = (index: Int, items: Iterator[T]) => {
@@ -377,16 +388,17 @@ abstract class RDD[T: ClassTag](
 
   /**
    * Return a sampled subset of this RDD.
-   * 
+   *
    * @param withReplacement can elements be sampled multiple times (replaced 
when sampled out)
    * @param fraction expected size of the sample as a fraction of this RDD's 
size
    *  without replacement: probability that each element is chosen; fraction 
must be [0, 1]
    *  with replacement: expected number of times each element is chosen; 
fraction must be >= 0
    * @param seed seed for the random number generator
    */
-  def sample(withReplacement: Boolean,
+  def sample(
+      withReplacement: Boolean,
       fraction: Double,
-      seed: Long = Utils.random.nextLong): RDD[T] = {
+      seed: Long = Utils.random.nextLong): RDD[T] = withScope {
     require(fraction >= 0.0, "Negative fraction value: " + fraction)
     if (withReplacement) {
       new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), 
true, seed)
@@ -403,7 +415,9 @@ abstract class RDD[T: ClassTag](
    *
    * @return split RDDs in an array
    */
-  def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): 
Array[RDD[T]] = {
+  def randomSplit(
+      weights: Array[Double],
+      seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
     normalizedCumWeights.sliding(2).map { x =>
@@ -435,7 +449,9 @@ abstract class RDD[T: ClassTag](
    * @param seed seed for the random number generator
    * @return sample of specified size in an array
    */
-  def takeSample(withReplacement: Boolean,
+  // TODO: rewrite this without return statements so we can wrap it in a scope
+  def takeSample(
+      withReplacement: Boolean,
       num: Int,
       seed: Long = Utils.random.nextLong): Array[T] = {
     val numStDev =  10.0
@@ -483,7 +499,7 @@ abstract class RDD[T: ClassTag](
    * Return the union of this RDD and another one. Any identical elements will 
appear multiple
    * times (use `.distinct()` to eliminate them).
    */
-  def union(other: RDD[T]): RDD[T] = {
+  def union(other: RDD[T]): RDD[T] = withScope {
     if (partitioner.isDefined && other.partitioner == partitioner) {
       new PartitionerAwareUnionRDD(sc, Array(this, other))
     } else {
@@ -495,7 +511,9 @@ abstract class RDD[T: ClassTag](
    * Return the union of this RDD and another one. Any identical elements will 
appear multiple
    * times (use `.distinct()` to eliminate them).
    */
-  def ++(other: RDD[T]): RDD[T] = this.union(other)
+  def ++(other: RDD[T]): RDD[T] = withScope {
+    this.union(other)
+  }
 
   /**
    * Return this RDD sorted by the given key function.
@@ -504,10 +522,11 @@ abstract class RDD[T: ClassTag](
       f: (T) => K,
       ascending: Boolean = true,
       numPartitions: Int = this.partitions.length)
-      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
+      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
     this.keyBy[K](f)
         .sortByKey(ascending, numPartitions)
         .values
+  }
 
   /**
    * Return the intersection of this RDD and another one. The output will not 
contain any duplicate
@@ -515,7 +534,7 @@ abstract class RDD[T: ClassTag](
    *
    * Note that this method performs a shuffle internally.
    */
-  def intersection(other: RDD[T]): RDD[T] = {
+  def intersection(other: RDD[T]): RDD[T] = withScope {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
         .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && 
rightGroup.nonEmpty }
         .keys
@@ -529,8 +548,9 @@ abstract class RDD[T: ClassTag](
    *
    * @param partitioner Partitioner to use for the resulting RDD
    */
-  def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: 
Ordering[T] = null)
-      : RDD[T] = {
+  def intersection(
+      other: RDD[T],
+      partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = 
withScope {
     this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
         .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && 
rightGroup.nonEmpty }
         .keys
@@ -544,16 +564,14 @@ abstract class RDD[T: ClassTag](
    *
    * @param numPartitions How many partitions to use in the resulting RDD
    */
-  def intersection(other: RDD[T], numPartitions: Int): RDD[T] = {
-    this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new 
HashPartitioner(numPartitions))
-        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && 
rightGroup.nonEmpty }
-        .keys
+  def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
+    intersection(other, new HashPartitioner(numPartitions))
   }
 
   /**
    * Return an RDD created by coalescing all elements within each partition 
into an array.
    */
-  def glom(): RDD[Array[T]] = {
+  def glom(): RDD[Array[T]] = withScope {
     new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => 
Iterator(iter.toArray))
   }
 
@@ -561,7 +579,9 @@ abstract class RDD[T: ClassTag](
    * Return the Cartesian product of this RDD and another one, that is, the 
RDD of all pairs of
    * elements (a, b) where a is in `this` and b is in `other`.
    */
-  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new 
CartesianRDD(sc, this, other)
+  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
+    new CartesianRDD(sc, this, other)
+  }
 
   /**
    * Return an RDD of grouped items. Each group consists of a key and a 
sequence of elements
@@ -572,8 +592,9 @@ abstract class RDD[T: ClassTag](
    * aggregation (such as a sum or average) over each key, using 
[[PairRDDFunctions.aggregateByKey]]
    * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
    */
-  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
+  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = 
withScope {
     groupBy[K](f, defaultPartitioner(this))
+  }
 
   /**
    * Return an RDD of grouped elements. Each group consists of a key and a 
sequence of elements
@@ -584,8 +605,11 @@ abstract class RDD[T: ClassTag](
    * aggregation (such as a sum or average) over each key, using 
[[PairRDDFunctions.aggregateByKey]]
    * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
    */
-  def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): 
RDD[(K, Iterable[T])] =
+  def groupBy[K](
+      f: T => K,
+      numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = 
withScope {
     groupBy(f, new HashPartitioner(numPartitions))
+  }
 
   /**
    * Return an RDD of grouped items. Each group consists of a key and a 
sequence of elements
@@ -597,7 +621,7 @@ abstract class RDD[T: ClassTag](
    * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
    */
   def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: 
Ordering[K] = null)
-      : RDD[(K, Iterable[T])] = {
+      : RDD[(K, Iterable[T])] = withScope {
     val cleanF = sc.clean(f)
     this.map(t => (cleanF(t), t)).groupByKey(p)
   }
@@ -605,13 +629,16 @@ abstract class RDD[T: ClassTag](
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: String): RDD[String] = new PipedRDD(this, command)
+  def pipe(command: String): RDD[String] = withScope {
+    new PipedRDD(this, command)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: String, env: Map[String, String]): RDD[String] =
+  def pipe(command: String, env: Map[String, String]): RDD[String] = withScope 
{
     new PipedRDD(this, command, env)
+  }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
@@ -619,7 +646,7 @@ abstract class RDD[T: ClassTag](
    *
    * @param command command to run in forked process.
    * @param env environment variables to set.
-   * @param printPipeContext Before piping elements, this function is called 
as an oppotunity
+   * @param printPipeContext Before piping elements, this function is called 
as an opportunity
    *                         to pipe context data. Print line function (like 
out.println) will be
    *                         passed as printPipeContext's parameter.
    * @param printRDDElement Use this function to customize how to pipe 
elements. This function
@@ -637,7 +664,7 @@ abstract class RDD[T: ClassTag](
       env: Map[String, String] = Map(),
       printPipeContext: (String => Unit) => Unit = null,
       printRDDElement: (T, String => Unit) => Unit = null,
-      separateWorkingDir: Boolean = false): RDD[String] = {
+      separateWorkingDir: Boolean = false): RDD[String] = withScope {
     new PipedRDD(this, command, env,
       if (printPipeContext ne null) sc.clean(printPipeContext) else null,
       if (printRDDElement ne null) sc.clean(printRDDElement) else null,
@@ -651,7 +678,7 @@ abstract class RDD[T: ClassTag](
    * should be `false` unless this is a pair RDD and the input function 
doesn't modify the keys.
    */
   def mapPartitions[U: ClassTag](
-      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
+      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = withScope {
     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
   }
@@ -664,7 +691,8 @@ abstract class RDD[T: ClassTag](
    * should be `false` unless this is a pair RDD and the input function 
doesn't modify the keys.
    */
   def mapPartitionsWithIndex[U: ClassTag](
-      f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = 
false): RDD[U] = {
+      f: (Int, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = withScope {
     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
f(index, iter)
     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
   }
@@ -681,7 +709,7 @@ abstract class RDD[T: ClassTag](
   @deprecated("use TaskContext.get", "1.2.0")
   def mapPartitionsWithContext[U: ClassTag](
       f: (TaskContext, Iterator[T]) => Iterator[U],
-      preservesPartitioning: Boolean = false): RDD[U] = {
+      preservesPartitioning: Boolean = false): RDD[U] = withScope {
     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
f(context, iter)
     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
   }
@@ -692,7 +720,8 @@ abstract class RDD[T: ClassTag](
    */
   @deprecated("use mapPartitionsWithIndex", "0.7.0")
   def mapPartitionsWithSplit[U: ClassTag](
-      f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = 
false): RDD[U] = {
+      f: (Int, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = withScope {
     mapPartitionsWithIndex(f, preservesPartitioning)
   }
 
@@ -704,7 +733,7 @@ abstract class RDD[T: ClassTag](
   @deprecated("use mapPartitionsWithIndex", "1.0.0")
   def mapWith[A, U: ClassTag]
       (constructA: Int => A, preservesPartitioning: Boolean = false)
-      (f: (T, A) => U): RDD[U] = {
+      (f: (T, A) => U): RDD[U] = withScope {
     mapPartitionsWithIndex((index, iter) => {
       val a = constructA(index)
       iter.map(t => f(t, a))
@@ -719,7 +748,7 @@ abstract class RDD[T: ClassTag](
   @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0")
   def flatMapWith[A, U: ClassTag]
       (constructA: Int => A, preservesPartitioning: Boolean = false)
-      (f: (T, A) => Seq[U]): RDD[U] = {
+      (f: (T, A) => Seq[U]): RDD[U] = withScope {
     mapPartitionsWithIndex((index, iter) => {
       val a = constructA(index)
       iter.flatMap(t => f(t, a))
@@ -732,11 +761,11 @@ abstract class RDD[T: ClassTag](
    * partition with the index of that partition.
    */
   @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
-  def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) {
+  def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = 
withScope {
     mapPartitionsWithIndex { (index, iter) =>
       val a = constructA(index)
       iter.map(t => {f(t, a); t})
-    }.foreach(_ => {})
+    }
   }
 
   /**
@@ -745,7 +774,7 @@ abstract class RDD[T: ClassTag](
    * partition with the index of that partition.
    */
   @deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
-  def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
+  def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = 
withScope {
     mapPartitionsWithIndex((index, iter) => {
       val a = constructA(index)
       iter.filter(t => p(t, a))
@@ -758,7 +787,7 @@ abstract class RDD[T: ClassTag](
    * partitions* and the *same number of elements in each partition* (e.g. one 
was made through
    * a map on the other).
    */
-  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
+  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
     zipPartitions(other, preservesPartitioning = false) { (thisIter, 
otherIter) =>
       new Iterator[(T, U)] {
         def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
@@ -780,33 +809,39 @@ abstract class RDD[T: ClassTag](
    */
   def zipPartitions[B: ClassTag, V: ClassTag]
       (rdd2: RDD[B], preservesPartitioning: Boolean)
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
     new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, 
preservesPartitioning)
+  }
 
   def zipPartitions[B: ClassTag, V: ClassTag]
       (rdd2: RDD[B])
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+    zipPartitions(rdd2, preservesPartitioning = false)(f)
+  }
 
   def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
     new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, 
preservesPartitioning)
+  }
 
   def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C])
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
+    zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
+  }
 
   def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: 
Boolean)
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] =
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] = withScope {
     new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, 
preservesPartitioning)
+  }
 
   def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] = withScope {
+    zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+  }
 
 
   // Actions (launch a job to return a value to the user program)
@@ -814,7 +849,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Applies a function f to all elements of this RDD.
    */
-  def foreach(f: T => Unit) {
+  def foreach(f: T => Unit): Unit = withScope {
     val cleanF = sc.clean(f)
     sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
@@ -822,7 +857,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Applies a function f to each partition of this RDD.
    */
-  def foreachPartition(f: Iterator[T] => Unit) {
+  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
     val cleanF = sc.clean(f)
     sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
   }
@@ -830,7 +865,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return an array that contains all of the elements in this RDD.
    */
-  def collect(): Array[T] = {
+  def collect(): Array[T] = withScope {
     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
     Array.concat(results: _*)
   }
@@ -840,7 +875,7 @@ abstract class RDD[T: ClassTag](
    *
    * The iterator will consume as much memory as the largest partition in this 
RDD.
    */
-  def toLocalIterator: Iterator[T] = {
+  def toLocalIterator: Iterator[T] = withScope {
     def collectPartition(p: Int): Array[T] = {
       sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal 
= false).head
     }
@@ -851,12 +886,14 @@ abstract class RDD[T: ClassTag](
    * Return an array that contains all of the elements in this RDD.
    */
   @deprecated("use collect", "1.0.0")
-  def toArray(): Array[T] = collect()
+  def toArray(): Array[T] = withScope {
+    collect()
+  }
 
   /**
    * Return an RDD that contains all matching values by applying `f`.
    */
-  def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
+  def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
     filter(f.isDefinedAt).map(f)
   }
 
@@ -866,19 +903,23 @@ abstract class RDD[T: ClassTag](
    * Uses `this` partitioner/partition size, because even if `other` is huge, 
the resulting
    * RDD will be <= us.
    */
-  def subtract(other: RDD[T]): RDD[T] =
+  def subtract(other: RDD[T]): RDD[T] = withScope {
     subtract(other, partitioner.getOrElse(new 
HashPartitioner(partitions.length)))
+  }
 
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
    */
-  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+  def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
     subtract(other, new HashPartitioner(numPartitions))
+  }
 
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
    */
-  def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = 
null): RDD[T] = {
+  def subtract(
+      other: RDD[T],
+      p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
     if (partitioner == Some(p)) {
       // Our partitioner knows how to handle T (which, since we have a 
partitioner, is
       // really (K, V)) so make a new Partitioner that will de-tuple our fake 
tuples
@@ -900,7 +941,7 @@ abstract class RDD[T: ClassTag](
    * Reduces the elements of this RDD using the specified commutative and
    * associative binary operator.
    */
-  def reduce(f: (T, T) => T): T = {
+  def reduce(f: (T, T) => T): T = withScope {
     val cleanF = sc.clean(f)
     val reducePartition: Iterator[T] => Option[T] = iter => {
       if (iter.hasNext) {
@@ -929,7 +970,7 @@ abstract class RDD[T: ClassTag](
    * @param depth suggested depth of the tree (default: 2)
    * @see [[org.apache.spark.rdd.RDD#reduce]]
    */
-  def treeReduce(f: (T, T) => T, depth: Int = 2): T = {
+  def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
     require(depth >= 1, s"Depth must be greater than or equal to 1 but got 
$depth.")
     val cleanF = context.clean(f)
     val reducePartition: Iterator[T] => Option[T] = iter => {
@@ -961,7 +1002,7 @@ abstract class RDD[T: ClassTag](
    * modify t1 and return it as its result value to avoid object allocation; 
however, it should not
    * modify t2.
    */
-  def fold(zeroValue: T)(op: (T, T) => T): T = {
+  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
     // Clone the zero value since we will also be serializing it as part of 
tasks
     var jobResult = Utils.clone(zeroValue, 
sc.env.closureSerializer.newInstance())
     val cleanOp = sc.clean(op)
@@ -979,7 +1020,7 @@ abstract class RDD[T: ClassTag](
    * allowed to modify and return their first argument instead of creating a 
new U to avoid memory
    * allocation.
    */
-  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U): U = {
+  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U): U = withScope {
     // Clone the zero value since we will also be serializing it as part of 
tasks
     var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
     val cleanSeqOp = sc.clean(seqOp)
@@ -999,26 +1040,29 @@ abstract class RDD[T: ClassTag](
   def treeAggregate[U: ClassTag](zeroValue: U)(
       seqOp: (U, T) => U,
       combOp: (U, U) => U,
-      depth: Int = 2): U = {
+      depth: Int = 2): U = withScope {
     require(depth >= 1, s"Depth must be greater than or equal to 1 but got 
$depth.")
     if (partitions.length == 0) {
-      return Utils.clone(zeroValue, 
context.env.closureSerializer.newInstance())
-    }
-    val cleanSeqOp = context.clean(seqOp)
-    val cleanCombOp = context.clean(combOp)
-    val aggregatePartition = (it: Iterator[T]) => 
it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
-    var partiallyAggregated = mapPartitions(it => 
Iterator(aggregatePartition(it)))
-    var numPartitions = partiallyAggregated.partitions.length
-    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / 
depth)).toInt, 2)
-    // If creating an extra level doesn't help reduce the wall-clock time, we 
stop tree aggregation.
-    while (numPartitions > scale + numPartitions / scale) {
-      numPartitions /= scale
-      val curNumPartitions = numPartitions
-      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, 
iter) =>
-        iter.map((i % curNumPartitions, _))
-      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+      Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
+    } else {
+      val cleanSeqOp = context.clean(seqOp)
+      val cleanCombOp = context.clean(combOp)
+      val aggregatePartition =
+        (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+      var partiallyAggregated = mapPartitions(it => 
Iterator(aggregatePartition(it)))
+      var numPartitions = partiallyAggregated.partitions.length
+      val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / 
depth)).toInt, 2)
+      // If creating an extra level doesn't help reduce
+      // the wall-clock time, we stop tree aggregation.
+      while (numPartitions > scale + numPartitions / scale) {
+        numPartitions /= scale
+        val curNumPartitions = numPartitions
+        partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
+          (i, iter) => iter.map((i % curNumPartitions, _))
+        }.reduceByKey(new HashPartitioner(curNumPartitions), 
cleanCombOp).values
+      }
+      partiallyAggregated.reduce(cleanCombOp)
     }
-    partiallyAggregated.reduce(cleanCombOp)
   }
 
   /**
@@ -1032,7 +1076,9 @@ abstract class RDD[T: ClassTag](
    * within a timeout, even if not all tasks have finished.
    */
   @Experimental
-  def countApprox(timeout: Long, confidence: Double = 0.95): 
PartialResult[BoundedDouble] = {
+  def countApprox(
+      timeout: Long,
+      confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
     val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
       var result = 0L
       while (iter.hasNext) {
@@ -1053,7 +1099,7 @@ abstract class RDD[T: ClassTag](
    * To handle very large results, consider using rdd.map(x => (x, 
1L)).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
-  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
+  def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = 
withScope {
     map(value => (value, null)).countByKey()
   }
 
@@ -1064,8 +1110,7 @@ abstract class RDD[T: ClassTag](
   @Experimental
   def countByValueApprox(timeout: Long, confidence: Double = 0.95)
       (implicit ord: Ordering[T] = null)
-      : PartialResult[Map[T, BoundedDouble]] =
-  {
+      : PartialResult[Map[T, BoundedDouble]] = withScope {
     if (elementClassTag.runtimeClass.isArray) {
       throw new SparkException("countByValueApprox() does not support arrays")
     }
@@ -1098,7 +1143,7 @@ abstract class RDD[T: ClassTag](
    *           If `sp` equals 0, the sparse representation is skipped.
    */
   @Experimental
-  def countApproxDistinct(p: Int, sp: Int): Long = {
+  def countApproxDistinct(p: Int, sp: Int): Long = withScope {
     require(p >= 4, s"p ($p) must be at least 4")
     require(sp <= 32, s"sp ($sp) cannot be greater than 32")
     require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
@@ -1124,7 +1169,7 @@ abstract class RDD[T: ClassTag](
    * @param relativeSD Relative accuracy. Smaller values create counters that 
require more space.
    *                   It must be greater than 0.000017.
    */
-  def countApproxDistinct(relativeSD: Double = 0.05): Long = {
+  def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
     val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
     countApproxDistinct(p, 0)
   }
@@ -1142,7 +1187,9 @@ abstract class RDD[T: ClassTag](
    * and may even change if the RDD is reevaluated. If a fixed ordering is 
required to guarantee
    * the same index assignments, you should sort the RDD with sortByKey() or 
save it to a file.
    */
-  def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
+  def zipWithIndex(): RDD[(T, Long)] = withScope {
+    new ZippedWithIndexRDD(this)
+  }
 
   /**
    * Zips this RDD with generated unique Long ids. Items in the kth partition 
will get ids k, n+k,
@@ -1154,7 +1201,7 @@ abstract class RDD[T: ClassTag](
    * and may even change if the RDD is reevaluated. If a fixed ordering is 
required to guarantee
    * the same index assignments, you should sort the RDD with sortByKey() or 
save it to a file.
    */
-  def zipWithUniqueId(): RDD[(T, Long)] = {
+  def zipWithUniqueId(): RDD[(T, Long)] = withScope {
     val n = this.partitions.length.toLong
     this.mapPartitionsWithIndex { case (k, iter) =>
       iter.zipWithIndex.map { case (item, i) =>
@@ -1171,48 +1218,50 @@ abstract class RDD[T: ClassTag](
    * @note due to complications in the internal implementation, this method 
will raise
    * an exception if called on an RDD of `Nothing` or `Null`.
    */
-  def take(num: Int): Array[T] = {
+  def take(num: Int): Array[T] = withScope {
     if (num == 0) {
-      return new Array[T](0)
-    }
-
-    val buf = new ArrayBuffer[T]
-    val totalParts = this.partitions.length
-    var partsScanned = 0
-    while (buf.size < num && partsScanned < totalParts) {
-      // The number of partitions to try in this iteration. It is ok for this 
number to be
-      // greater than totalParts because we actually cap it at totalParts in 
runJob.
-      var numPartsToTry = 1
-      if (partsScanned > 0) {
-        // If we didn't find any rows after the previous iteration, quadruple 
and retry. Otherwise,
-        // interpolate the number of partitions we need to try, but 
overestimate it by 50%.
-        // We also cap the estimation in the end.
-        if (buf.size == 0) {
-          numPartsToTry = partsScanned * 4
-        } else {
-          // the left side of max is >=1 whenever partsScanned >= 2
-          numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt 
- partsScanned, 1)
-          numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) 
+      new Array[T](0)
+    } else {
+      val buf = new ArrayBuffer[T]
+      val totalParts = this.partitions.length
+      var partsScanned = 0
+      while (buf.size < num && partsScanned < totalParts) {
+        // The number of partitions to try in this iteration. It is ok for 
this number to be
+        // greater than totalParts because we actually cap it at totalParts in 
runJob.
+        var numPartsToTry = 1
+        if (partsScanned > 0) {
+          // If we didn't find any rows after the previous iteration, 
quadruple and retry.
+          // Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+          // it by 50%. We also cap the estimation in the end.
+          if (buf.size == 0) {
+            numPartsToTry = partsScanned * 4
+          } else {
+            // the left side of max is >=1 whenever partsScanned >= 2
+            numPartsToTry = Math.max((1.5 * num * partsScanned / 
buf.size).toInt - partsScanned, 1)
+            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
+          }
         }
-      }
 
-      val left = num - buf.size
-      val p = partsScanned until math.min(partsScanned + numPartsToTry, 
totalParts)
-      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, 
allowLocal = true)
+        val left = num - buf.size
+        val p = partsScanned until math.min(partsScanned + numPartsToTry, 
totalParts)
+        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, 
p, allowLocal = true)
 
-      res.foreach(buf ++= _.take(num - buf.size))
-      partsScanned += numPartsToTry
-    }
+        res.foreach(buf ++= _.take(num - buf.size))
+        partsScanned += numPartsToTry
+      }
 
-    buf.toArray
+      buf.toArray
+    }
   }
 
   /**
    * Return the first element in this RDD.
    */
-  def first(): T = take(1) match {
-    case Array(t) => t
-    case _ => throw new UnsupportedOperationException("empty collection")
+  def first(): T = withScope {
+    take(1) match {
+      case Array(t) => t
+      case _ => throw new UnsupportedOperationException("empty collection")
+    }
   }
 
   /**
@@ -1230,7 +1279,9 @@ abstract class RDD[T: ClassTag](
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = 
takeOrdered(num)(ord.reverse)
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
+    takeOrdered(num)(ord.reverse)
+  }
 
   /**
    * Returns the first k (smallest) elements from this RDD as defined by the 
specified
@@ -1248,7 +1299,7 @@ abstract class RDD[T: ClassTag](
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
     if (num == 0) {
       Array.empty
     } else {
@@ -1273,13 +1324,17 @@ abstract class RDD[T: ClassTag](
    * Returns the max of this RDD as defined by the implicit Ordering[T].
    * @return the maximum element of the RDD
    * */
-  def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max)
+  def max()(implicit ord: Ordering[T]): T = withScope {
+    this.reduce(ord.max)
+  }
 
   /**
    * Returns the min of this RDD as defined by the implicit Ordering[T].
    * @return the minimum element of the RDD
    * */
-  def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
+  def min()(implicit ord: Ordering[T]): T = withScope {
+    this.reduce(ord.min)
+  }
 
   /**
    * @note due to complications in the internal implementation, this method 
will raise an
@@ -1289,12 +1344,14 @@ abstract class RDD[T: ClassTag](
    * @return true if and only if the RDD contains no elements at all. Note 
that an RDD
    *         may be empty even when it has at least 1 partition.
    */
-  def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
+  def isEmpty(): Boolean = withScope {
+    partitions.length == 0 || take(1).length == 0
+  }
 
   /**
    * Save this RDD as a text file, using string representations of elements.
    */
-  def saveAsTextFile(path: String) {
+  def saveAsTextFile(path: String): Unit = withScope {
     // https://issues.apache.org/jira/browse/SPARK-2075
     //
     // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot 
find an implicit
@@ -1321,7 +1378,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Save this RDD as a compressed text file, using string representations of 
elements.
    */
-  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit 
= withScope {
     // https://issues.apache.org/jira/browse/SPARK-2075
     val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
     val textClassTag = implicitly[ClassTag[Text]]
@@ -1339,7 +1396,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Save this RDD as a SequenceFile of serialized objects.
    */
-  def saveAsObjectFile(path: String) {
+  def saveAsObjectFile(path: String): Unit = withScope {
     this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
       .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
       .saveAsSequenceFile(path)
@@ -1348,12 +1405,12 @@ abstract class RDD[T: ClassTag](
   /**
    * Creates tuples of the elements in this RDD by applying `f`.
    */
-  def keyBy[K](f: T => K): RDD[(K, T)] = {
+  def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
     map(x => (f(x), x))
   }
 
   /** A private method for tests, to look at the contents of each partition */
-  private[spark] def collectPartitions(): Array[Array[T]] = {
+  private[spark] def collectPartitions(): Array[Array[T]] = withScope {
     sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
   }
 
@@ -1392,6 +1449,17 @@ abstract class RDD[T: ClassTag](
   /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
   @transient private[spark] val creationSite = sc.getCallSite()
 
+  /**
+   * The scope associated with the operation that created this RDD.
+   *
+   * This is more flexible than the call site and can be defined 
hierarchically. For more
+   * detail, see the documentation of {{RDDOperationScope}}. This scope is not 
defined if the
+   * user instantiates this RDD himself without using any Spark operations.
+   */
+  @transient private[spark] val scope: Option[RDDOperationScope] = {
+    
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
+  }
+
   private[spark] def getCreationSite: String = 
Option(creationSite).map(_.shortForm).getOrElse("")
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
@@ -1470,7 +1538,7 @@ abstract class RDD[T: ClassTag](
   /** A description of this RDD and its recursive dependencies for debugging. 
*/
   def toDebugString: String = {
     // Get a debug description of an rdd without its children
-    def debugSelf (rdd: RDD[_]): Seq[String] = {
+    def debugSelf(rdd: RDD[_]): Seq[String] = {
       import Utils.bytesToString
 
       val persistence = if (storageLevel != StorageLevel.NONE) 
storageLevel.description else ""
@@ -1527,10 +1595,11 @@ abstract class RDD[T: ClassTag](
         case (desc: String, _) => s"$nextPrefix$desc"
       } ++ debugChildren(rdd, nextPrefix)
     }
-    def debugString(rdd: RDD[_],
-                    prefix: String = "",
-                    isShuffle: Boolean = true,
-                    isLastChild: Boolean = false): Seq[String] = {
+    def debugString(
+        rdd: RDD[_],
+        prefix: String = "",
+        isShuffle: Boolean = true,
+        isLastChild: Boolean = false): Seq[String] = {
       if (isShuffle) {
         shuffleDebugString(rdd, prefix, isLastChild)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
new file mode 100644
index 0000000..537b56b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, 
JsonPropertyOrder}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.SparkContext
+
+/**
+ * A general, named code block representing an operation that instantiates 
RDDs.
+ *
+ * All RDDs instantiated in the corresponding code block will store a pointer 
to this object.
+ * Examples include, but will not be limited to, existing RDD operations, such 
as textFile,
+ * reduceByKey, and treeAggregate.
+ *
+ * An operation scope may be nested in other scopes. For instance, a SQL query 
may enclose
+ * scopes associated with the public RDD APIs it uses under the hood.
+ *
+ * There is no particular relationship between an operation scope and a stage 
or a job.
+ * A scope may live inside one stage (e.g. map) or span across multiple jobs 
(e.g. take).
+ */
+@JsonInclude(Include.NON_NULL)
+@JsonPropertyOrder(Array("id", "name", "parent"))
+private[spark] class RDDOperationScope(
+    val name: String,
+    val parent: Option[RDDOperationScope] = None) {
+
+  val id: Int = RDDOperationScope.nextScopeId()
+
+  def toJson: String = {
+    RDDOperationScope.jsonMapper.writeValueAsString(this)
+  }
+
+  /**
+   * Return a list of scopes that this scope is a part of, including this 
scope itself.
+   * The result is ordered from the outermost scope (eldest ancestor) to this 
scope.
+   */
+  @JsonIgnore
+  def getAllScopes: Seq[RDDOperationScope] = {
+    parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
+  }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case s: RDDOperationScope =>
+        id == s.id && name == s.name && parent == s.parent
+      case _ => false
+    }
+  }
+
+  override def toString: String = toJson
+}
+
+/**
+ * A collection of utility methods to construct a hierarchical representation 
of RDD scopes.
+ * An RDD scope tracks the series of operations that created a given RDD.
+ */
+private[spark] object RDDOperationScope {
+  private val jsonMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+  private val scopeCounter = new AtomicInteger(0)
+
+  def fromJson(s: String): RDDOperationScope = {
+    jsonMapper.readValue(s, classOf[RDDOperationScope])
+  }
+
+  /** Return a globally unique operation scope ID. */
+  def nextScopeId(): Int = scopeCounter.getAndIncrement
+
+  /**
+   * Execute the given body such that all RDDs created in this body will have 
the same scope.
+   * The name of the scope will be the name of the method that immediately 
encloses this one.
+   *
+   * Note: Return statements are NOT allowed in body.
+   */
+  private[spark] def withScope[T](
+      sc: SparkContext,
+      allowNesting: Boolean = false)(body: => T): T = {
+    val callerMethodName = 
Thread.currentThread.getStackTrace()(3).getMethodName
+    withScope[T](sc, callerMethodName, allowNesting)(body)
+  }
+
+  /**
+   * Execute the given body such that all RDDs created in this body will have 
the same scope.
+   *
+   * If nesting is allowed, this concatenates the previous scope with the new 
one in a way that
+   * signifies the hierarchy. Otherwise, if nesting is not allowed, then any 
children calls to
+   * this method executed in the body will have no effect.
+   *
+   * Note: Return statements are NOT allowed in body.
+   */
+  private[spark] def withScope[T](
+      sc: SparkContext,
+      name: String,
+      allowNesting: Boolean = false)(body: => T): T = {
+    // Save the old scope to restore it later
+    val scopeKey = SparkContext.RDD_SCOPE_KEY
+    val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
+    val oldScopeJson = sc.getLocalProperty(scopeKey)
+    val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
+    val oldNoOverride = sc.getLocalProperty(noOverrideKey)
+    try {
+      // Set the scope only if the higher level caller allows us to do so
+      if (sc.getLocalProperty(noOverrideKey) == null) {
+        sc.setLocalProperty(scopeKey, new RDDOperationScope(name, 
oldScope).toJson)
+      }
+      // Optionally disallow the child body to override our scope
+      if (!allowNesting) {
+        sc.setLocalProperty(noOverrideKey, "true")
+      }
+      body
+    } finally {
+      // Remember to restore any state that was modified before exiting
+      sc.setLocalProperty(scopeKey, oldScopeJson)
+      sc.setLocalProperty(noOverrideKey, oldNoOverride)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 059f896..3dfcf67 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -85,7 +85,9 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% 
Writable : ClassTag
    * byte arrays to BytesWritable, and Strings to Text. The `path` can be on 
any Hadoop-supported
    * file system.
    */
-  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: 
CompressionCodec]] = None) {
+  def saveAsSequenceFile(
+      path: String,
+      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = 
self.withScope {
     def anyToWritable[U <% Writable](u: U): Writable = u
 
     // TODO We cannot force the return type of `anyToWritable` be same as 
keyWritableClass and

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index cf3db0b..e439d2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -33,6 +33,7 @@ class StageInfo(
     val name: String,
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
+    val parentIds: Seq[Int],
     val details: String) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. 
*/
   var submissionTime: Option[Long] = None
@@ -78,6 +79,7 @@ private[spark] object StageInfo {
       stage.name,
       numTasks.getOrElse(stage.numTasks),
       rddInfos,
+      stage.parents.map(_.id),
       stage.details)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala 
b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index ad53a3e..9606262 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.storage
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDDOperationScope, RDD}
 import org.apache.spark.util.Utils
 
 @DeveloperApi
@@ -26,7 +26,9 @@ class RDDInfo(
     val id: Int,
     val name: String,
     val numPartitions: Int,
-    var storageLevel: StorageLevel)
+    var storageLevel: StorageLevel,
+    val parentIds: Seq[Int],
+    val scope: Option[RDDOperationScope] = None)
   extends Ordered[RDDInfo] {
 
   var numCachedPartitions = 0
@@ -52,7 +54,8 @@ class RDDInfo(
 
 private[spark] object RDDInfo {
   def fromRdd(rdd: RDD[_]): RDDInfo = {
-    val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
-    new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
+    val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
+    val parentIds = rdd.dependencies.map(_.rdd.id)
+    new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, 
parentIds, rdd.scope)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 06fce86..a5271f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -25,6 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, 
EnvironmentTab}
 import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
 import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
 import org.apache.spark.ui.storage.{StorageListener, StorageTab}
+import org.apache.spark.ui.scope.RDDOperationGraphListener
 
 /**
  * Top level user interface for a Spark application.
@@ -38,6 +39,7 @@ private[spark] class SparkUI private (
     val executorsListener: ExecutorsListener,
     val jobProgressListener: JobProgressListener,
     val storageListener: StorageListener,
+    val operationGraphListener: RDDOperationGraphListener,
     var appName: String,
     val basePath: String)
   extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, 
"SparkUI")
@@ -93,6 +95,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, 
prefix: String)
 private[spark] object SparkUI {
   val DEFAULT_PORT = 4040
   val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+  val DEFAULT_POOL_NAME = "default"
+  val DEFAULT_RETAINED_STAGES = 1000
+  val DEFAULT_RETAINED_JOBS = 1000
 
   def getUIPort(conf: SparkConf): Int = {
     conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
@@ -144,13 +149,16 @@ private[spark] object SparkUI {
     val storageStatusListener = new StorageStatusListener
     val executorsListener = new ExecutorsListener(storageStatusListener)
     val storageListener = new StorageListener(storageStatusListener)
+    val operationGraphListener = new RDDOperationGraphListener(conf)
 
     listenerBus.addListener(environmentListener)
     listenerBus.addListener(storageStatusListener)
     listenerBus.addListener(executorsListener)
     listenerBus.addListener(storageListener)
+    listenerBus.addListener(operationGraphListener)
 
     new SparkUI(sc, conf, securityManager, environmentListener, 
storageStatusListener,
-      executorsListener, _jobProgressListener, storageListener, appName, 
basePath)
+      executorsListener, _jobProgressListener, storageListener, 
operationGraphListener,
+      appName, basePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 395af2e..2f3fb18 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -23,6 +23,7 @@ import java.util.{Locale, Date}
 import scala.xml.{Node, Text}
 
 import org.apache.spark.Logging
+import org.apache.spark.ui.scope.RDDOperationGraph
 
 /** Utility functions for generating XML pages with spark content. */
 private[spark] object UIUtils extends Logging {
@@ -172,13 +173,21 @@ private[spark] object UIUtils extends Logging {
     <script src={prependBaseUri("/static/timeline-view.js")}></script>
   }
 
+  def vizHeaderNodes: Seq[Node] = {
+    <script src={prependBaseUri("/static/d3.min.js")}></script>
+    <script src={prependBaseUri("/static/dagre-d3.min.js")}></script>
+    <script src={prependBaseUri("/static/graphlib-dot.min.js")}></script>
+    <script src={prependBaseUri("/static/spark-dag-viz.js")}></script>
+  }
+
   /** Returns a spark page with correctly formatted headers */
   def headerSparkPage(
       title: String,
       content: => Seq[Node],
       activeTab: SparkUITab,
       refreshInterval: Option[Int] = None,
-      helpText: Option[String] = None): Seq[Node] = {
+      helpText: Option[String] = None,
+      showVisualization: Boolean = false): Seq[Node] = {
 
     val appName = activeTab.appName
     val shortAppName = if (appName.length < 36) appName else appName.take(32) 
+ "..."
@@ -196,6 +205,7 @@ private[spark] object UIUtils extends Logging {
     <html>
       <head>
         {commonHeaderNodes}
+        {if (showVisualization) vizHeaderNodes else Seq.empty}
         <title>{appName} - {title}</title>
       </head>
       <body>
@@ -320,4 +330,47 @@ private[spark] object UIUtils extends Logging {
       <div class="bar bar-running" style={startWidth}></div>
     </div>
   }
+
+  /** Return a "DAG visualization" DOM element that expands into a 
visualization for a stage. */
+  def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): 
Seq[Node] = {
+    showDagViz(graph.toSeq, forJob = false)
+  }
+
+  /** Return a "DAG visualization" DOM element that expands into a 
visualization for a job. */
+  def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] 
= {
+    showDagViz(graphs, forJob = true)
+  }
+
+  /**
+   * Return a "DAG visualization" DOM element that expands into a 
visualization on the UI.
+   *
+   * This populates metadata necessary for generating the visualization on the 
front-end in
+   * a format that is expected by spark-dag-viz.js. Any changes in the format 
here must be
+   * reflected there.
+   */
+  private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): 
Seq[Node] = {
+    <div>
+      <span class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}>
+        <span class="expand-dag-viz-arrow arrow-closed"></span>
+        <strong>DAG visualization</strong>
+      </span>
+      <div id="dag-viz-graph"></div>
+      <div id="dag-viz-metadata">
+        {
+          graphs.map { g =>
+            <div class="stage-metadata" stageId={g.rootCluster.id} 
style="display:none">
+              <div class="dot-file">{RDDOperationGraph.makeDotFile(g, 
forJob)}</div>
+              { g.incomingEdges.map { e => <div 
class="incoming-edge">{e.fromId},{e.toId}</div> } }
+              { g.outgoingEdges.map { e => <div 
class="outgoing-edge">{e.fromId},{e.toId}</div> } }
+              {
+                g.rootCluster.getAllNodes.filter(_.cached).map { n =>
+                  <div class="cached-rdd">{n.id}</div>
+                }
+              }
+            </div>
+          }
+        }
+      </div>
+    </div>
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index a7ea12b..f6abf27 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -179,7 +179,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends 
WebUIPage("") {
 
     <span class="expand-application-timeline">
       <span class="expand-application-timeline-arrow arrow-closed"></span>
-      <strong>Event Timeline</strong>
+      <strong>Event timeline</strong>
     </span> ++
     <div id="application-timeline" class="collapsed">
       <div class="control-panel">

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index 527f960..236bc8e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
 /** Page showing list of all ongoing and recently finished stages and pools */
 private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
   private val sc = parent.sc
-  private val listener = parent.listener
+  private val listener = parent.progressListener
   private def isFairScheduler = parent.isFairScheduler
 
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -42,18 +42,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends 
WebUIPage("") {
 
       val activeStagesTable =
         new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-          parent.basePath, parent.listener, isFairScheduler = 
parent.isFairScheduler,
+          parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
           killEnabled = parent.killEnabled)
       val pendingStagesTable =
         new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
-          parent.basePath, parent.listener, isFairScheduler = 
parent.isFairScheduler,
+          parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
           killEnabled = false)
       val completedStagesTable =
         new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.listener, isFairScheduler = parent.isFairScheduler, 
killEnabled = false)
+          parent.progressListener, isFairScheduler = parent.isFairScheduler, 
killEnabled = false)
       val failedStagesTable =
         new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-          parent.listener, isFairScheduler = parent.isFairScheduler)
+          parent.progressListener, isFairScheduler = parent.isFairScheduler)
 
       // For now, pool information is only accessible in live UIs
       val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 1f8536d..d5cdbfa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
 
 /** Stage summary grouped by executors. */
 private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: 
StagesTab) {
-  private val listener = parent.listener
+  private val listener = parent.progressListener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index dd968e1..96cc3d7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ui.jobs.UIData.ExecutorUIData
 
 /** Page showing statistics and stage list for a given job */
 private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
+
   private val STAGES_LEGEND =
     <div class="legend-area"><svg width="150px" height="85px">
       <rect class="completed-stage-legend"
@@ -133,7 +134,7 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
     events.toSeq
   }
 
-  private def  makeTimeline(
+  private def makeTimeline(
       stages: Seq[StageInfo],
       executors: HashMap[String, ExecutorUIData],
       appStartTime: Long): Seq[Node] = {
@@ -160,7 +161,7 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
 
     <span class="expand-job-timeline">
       <span class="expand-job-timeline-arrow arrow-closed"></span>
-      <strong>Event Timeline</strong>
+      <strong>Event timeline</strong>
     </span> ++
     <div id="job-timeline" class="collapsed">
       <div class="control-panel">
@@ -198,7 +199,7 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
         // This could be empty if the JobProgressListener hasn't received 
information about the
         // stage or if the stage information has been garbage collected
         listener.stageIdToInfo.getOrElse(stageId,
-          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
+          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, 
"Unknown"))
       }
 
       val activeStages = Buffer[StageInfo]()
@@ -303,9 +304,14 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
       var content = summary
       val appStartTime = listener.startTime
       val executorListener = parent.executorListener
+      val operationGraphListener = parent.operationGraphListener
+
       content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
           executorListener.executorIdToData, appStartTime)
 
+      content ++= UIUtils.showDagVizForJob(
+        jobId, operationGraphListener.getOperationGraphForJob(jobId))
+
       if (shouldShowActiveStages) {
         content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
           activeStagesTable.toNodeSeq
@@ -326,7 +332,7 @@ private[ui] class JobPage(parent: JobsTab) extends 
WebUIPage("job") {
         content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> 
++
           failedStagesTable.toNodeSeq
       }
-      UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent)
+      UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, 
showVisualization = true)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d6d716d..8f9aa9f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.ui.SparkUI
 import org.apache.spark.ui.jobs.UIData._
 
 /**
@@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._
 @DeveloperApi
 class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
-  import JobProgressListener._
-
   // Define a handful of type aliases so that data structures' types can serve 
as documentation.
   // These type aliases are public because they're used in the types of public 
fields:
 
@@ -86,8 +85,8 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
   // number of non-active jobs and stages (there is no limit for active jobs 
and stages):
 
-  val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
-  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
DEFAULT_RETAINED_JOBS)
+  val retainedStages = conf.getInt("spark.ui.retainedStages", 
SparkUI.DEFAULT_RETAINED_STAGES)
+  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
SparkUI.DEFAULT_RETAINED_JOBS)
 
   // We can test for memory leaks by ensuring that collections that track 
non-active jobs and
   // stages do not grow without bound and that collections for active 
jobs/stages eventually become
@@ -288,8 +287,8 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     activeStages(stage.stageId) = stage
     pendingStages.remove(stage.stageId)
     val poolName = Option(stageSubmitted.properties).map {
-      p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
-    }.getOrElse(DEFAULT_POOL_NAME)
+      p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
+    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
 
     stageIdToInfo(stage.stageId) = stage
     val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptId), new StageUIData)
@@ -524,9 +523,3 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     startTime = appStarted.time
   }
 }
-
-private object JobProgressListener {
-  val DEFAULT_POOL_NAME = "default"
-  val DEFAULT_RETAINED_STAGES = 1000
-  val DEFAULT_RETAINED_JOBS = 1000
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 342787f..77ca60b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -24,10 +24,12 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
 private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
   val sc = parent.sc
   val killEnabled = parent.killEnabled
-  def isFairScheduler: Boolean =
-    jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
   val jobProgresslistener = parent.jobProgressListener
   val executorListener = parent.executorsListener
+  val operationGraphListener = parent.operationGraphListener
+
+  def isFairScheduler: Boolean =
+    jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index f47cdc9..d725b9d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
 /** Page showing specific pool details */
 private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
   private val sc = parent.sc
-  private val listener = parent.listener
+  private val listener = parent.progressListener
 
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
@@ -40,7 +40,7 @@ private[ui] class PoolPage(parent: StagesTab) extends 
WebUIPage("pool") {
         case None => Seq[StageInfo]()
       }
       val activeStagesTable = new 
StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-        parent.basePath, parent.listener, isFairScheduler = 
parent.isFairScheduler,
+        parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
         killEnabled = parent.killEnabled)
 
       // For now, pool information is only accessible in live UIs

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index df1899e..9ba2af5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
 private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
-  private val listener = parent.listener
+  private val listener = parent.progressListener
 
   def toNodeSeq: Seq[Node] = {
     listener.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 797c940..5793100 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -27,15 +27,17 @@ import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
 import org.apache.spark.ui.jobs.UIData._
+import org.apache.spark.ui.scope.RDDOperationGraph
 import org.apache.spark.util.{Utils, Distribution}
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
 
 /** Page showing statistics and task list for a given stage */
 private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
-  private val listener = parent.listener
+  private val progressListener = parent.progressListener
+  private val operationGraphListener = parent.operationGraphListener
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
+    progressListener.synchronized {
       val parameterId = request.getParameter("id")
       require(parameterId != null && parameterId.nonEmpty, "Missing id 
parameter")
 
@@ -44,7 +46,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
 
       val stageId = parameterId.toInt
       val stageAttemptId = parameterAttempt.toInt
-      val stageDataOption = listener.stageIdToData.get((stageId, 
stageAttemptId))
+      val stageDataOption = progressListener.stageIdToData.get((stageId, 
stageAttemptId))
 
       if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
         val content =
@@ -60,7 +62,7 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
       val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
-      val accumulables = listener.stageIdToData((stageId, 
stageAttemptId)).accumulables
+      val accumulables = progressListener.stageIdToData((stageId, 
stageAttemptId)).accumulables
       val hasAccumulators = accumulables.size > 0
 
       val summary =
@@ -169,6 +171,9 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
           </div>
         </div>
 
+      val dagViz = UIUtils.showDagVizForStage(
+        stageId, operationGraphListener.getOperationGraphForStage(stageId))
+
       val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
       def accumulableRow(acc: AccumulableInfo): Elem =
         <tr><td>{acc.name}</td><td>{acc.value}</td></tr>
@@ -434,13 +439,15 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
       val content =
         summary ++
         showAdditionalMetrics ++
+        dagViz ++
         <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
         <div>{summaryTable.getOrElse("No tasks have reported metrics 
yet.")}</div> ++
         <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
         maybeAccumulableTable ++
         <h4>Tasks</h4> ++ taskTable
 
-      UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, 
parent)
+      UIUtils.headerSparkPage(
+        "Details for Stage %d".format(stageId), content, parent, 
showVisualization = true)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 1bd2d87..5516995 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -26,19 +26,20 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
 private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, 
"stages") {
   val sc = parent.sc
   val killEnabled = parent.killEnabled
-  val listener = parent.jobProgressListener
+  val progressListener = parent.jobProgressListener
+  val operationGraphListener = parent.operationGraphListener
 
   attachPage(new AllStagesPage(this))
   attachPage(new StagePage(this))
   attachPage(new PoolPage(this))
 
-  def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == 
SchedulingMode.FAIR)
+  def isFairScheduler: Boolean = progressListener.schedulingMode.exists(_ == 
SchedulingMode.FAIR)
 
   def handleKillRequest(request: HttpServletRequest): Unit = {
     if (killEnabled && 
parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
       val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
       val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
-      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) 
{
+      if (stageId >= 0 && killFlag && 
progressListener.activeStages.contains(stageId)) {
         sc.get.cancelStage(stageId)
       }
       // Do a quick pause here to give Spark time to kill the stage so it 
shows up as

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
new file mode 100644
index 0000000..a18c193
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.Logging
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A representation of a generic cluster graph used for storing information on 
RDD operations.
+ *
+ * Each graph is defined with a set of edges and a root cluster, which may 
contain children
+ * nodes and children clusters. Additionally, a graph may also have edges that 
enter or exit
+ * the graph from nodes that belong to adjacent graphs.
+ */
+private[ui] case class RDDOperationGraph(
+    edges: Seq[RDDOperationEdge],
+    outgoingEdges: Seq[RDDOperationEdge],
+    incomingEdges: Seq[RDDOperationEdge],
+    rootCluster: RDDOperationCluster)
+
+/** A node in an RDDOperationGraph. This represents an RDD. */
+private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
+
+/**
+ * A directed edge connecting two nodes in an RDDOperationGraph.
+ * This represents an RDD dependency.
+ */
+private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
+
+/**
+ * A cluster that groups nodes together in an RDDOperationGraph.
+ *
+ * This represents any grouping of RDDs, including operation scopes (e.g. 
textFile, flatMap),
+ * stages, jobs, or any higher level construct. A cluster may be nested inside 
of other clusters.
+ */
+private[ui] class RDDOperationCluster(val id: String, val name: String) {
+  private val _childNodes = new ListBuffer[RDDOperationNode]
+  private val _childClusters = new ListBuffer[RDDOperationCluster]
+
+  def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
+  def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
+  def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += 
childNode }
+  def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
+    _childClusters += childCluster
+  }
+
+  /** Return all the nodes container in this cluster, including ones nested in 
other clusters. */
+  def getAllNodes: Seq[RDDOperationNode] = {
+    _childNodes ++ _childClusters.flatMap(_.childNodes)
+  }
+}
+
+private[ui] object RDDOperationGraph extends Logging {
+
+  /**
+   * Construct a RDDOperationGraph for a given stage.
+   *
+   * The root cluster represents the stage, and all children clusters 
represent RDD operations.
+   * Each node represents an RDD, and each edge represents a dependency 
between two RDDs pointing
+   * from the parent to the child.
+   *
+   * This does not currently merge common operation scopes across stages. This 
may be worth
+   * supporting in the future if we decide to group certain stages within the 
same job under
+   * a common scope (e.g. part of a SQL query).
+   */
+  def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
+    val edges = new ListBuffer[RDDOperationEdge]
+    val nodes = new mutable.HashMap[Int, RDDOperationNode]
+    val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed 
by cluster ID
+
+    // Root cluster is the stage cluster
+    val stageClusterId = s"stage_${stage.stageId}"
+    val stageClusterName = s"Stage ${stage.stageId}" +
+      { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
+    val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
+
+    // Find nodes, edges, and operation scopes that belong to this stage
+    stage.rddInfos.foreach { rdd =>
+      edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, 
rdd.id) }
+
+      // TODO: differentiate between the intention to cache an RDD and whether 
it's actually cached
+      val node = nodes.getOrElseUpdate(
+        rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != 
StorageLevel.NONE))
+
+      if (rdd.scope == null) {
+        // This RDD has no encompassing scope, so we put it directly in the 
root cluster
+        // This should happen only if an RDD is instantiated outside of a 
public RDD API
+        rootCluster.attachChildNode(node)
+      } else {
+        // Otherwise, this RDD belongs to an inner cluster,
+        // which may be nested inside of other clusters
+        val rddScopes = rdd.scope.map { scope => scope.getAllScopes 
}.getOrElse(Seq.empty)
+        val rddClusters = rddScopes.map { scope =>
+          val clusterId = scope.name + "_" + scope.id
+          val clusterName = scope.name
+          clusters.getOrElseUpdate(clusterId, new 
RDDOperationCluster(clusterId, clusterName))
+        }
+        // Build the cluster hierarchy for this RDD
+        rddClusters.sliding(2).foreach { pc =>
+          if (pc.size == 2) {
+            val parentCluster = pc(0)
+            val childCluster = pc(1)
+            parentCluster.attachChildCluster(childCluster)
+          }
+        }
+        // Attach the outermost cluster to the root cluster, and the RDD to 
the innermost cluster
+        rddClusters.headOption.foreach { cluster => 
rootCluster.attachChildCluster(cluster) }
+        rddClusters.lastOption.foreach { cluster => 
cluster.attachChildNode(node) }
+      }
+    }
+
+    // Classify each edge as internal, outgoing or incoming
+    // This information is needed to reason about how stages relate to each 
other
+    val internalEdges = new ListBuffer[RDDOperationEdge]
+    val outgoingEdges = new ListBuffer[RDDOperationEdge]
+    val incomingEdges = new ListBuffer[RDDOperationEdge]
+    edges.foreach { case e: RDDOperationEdge =>
+      val fromThisGraph = nodes.contains(e.fromId)
+      val toThisGraph = nodes.contains(e.toId)
+      (fromThisGraph, toThisGraph) match {
+        case (true, true) => internalEdges += e
+        case (true, false) => outgoingEdges += e
+        case (false, true) => incomingEdges += e
+        // should never happen
+        case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: 
$e")
+      }
+    }
+
+    RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
+  }
+
+  /**
+   * Generate the content of a dot file that describes the specified graph.
+   *
+   * Note that this only uses a minimal subset of features available to the 
DOT specification.
+   * Part of the styling must be done here because the rendering library must 
take certain
+   * attributes into account when arranging the graph elements. More style is 
added in the
+   * visualization later through post-processing in JavaScript.
+   *
+   * For the complete DOT specification, see 
http://www.graphviz.org/Documentation/dotguide.pdf.
+   */
+  def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = {
+    val dotFile = new StringBuilder
+    dotFile.append("digraph G {\n")
+    dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = "  "))
+    graph.edges.foreach { edge =>
+      dotFile.append(s"""  ${edge.fromId}->${edge.toId} 
[lineInterpolate="basis"];\n""")
+    }
+    dotFile.append("}")
+    val result = dotFile.toString()
+    logDebug(result)
+    result
+  }
+
+  /**
+   * Return the dot representation of a node in an RDDOperationGraph.
+   *
+   * On the job page, is displayed as a small circle without labels.
+   * On the stage page, it is displayed as a box with an embedded label.
+   */
+  private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = {
+    if (forJob) {
+      s"""${node.id} [label=" " shape="circle" padding="5" 
labelStyle="font-size: 0"]"""
+    } else {
+      s"""${node.id} [label="${node.name} (${node.id})"]"""
+    }
+  }
+
+  /** Return the dot representation of a subgraph in an RDDOperationGraph. */
+  private def makeDotSubgraph(
+      scope: RDDOperationCluster,
+      forJob: Boolean,
+      indent: String): String = {
+    val subgraph = new StringBuilder
+    subgraph.append(indent + s"subgraph cluster${scope.id} {\n")
+    subgraph.append(indent + s"""  label="${scope.name}";\n""")
+    scope.childNodes.foreach { node =>
+      subgraph.append(indent + s"  ${makeDotNode(node, forJob)};\n")
+    }
+    scope.childClusters.foreach { cscope =>
+      subgraph.append(makeDotSubgraph(cscope, forJob, indent + "  "))
+    }
+    subgraph.append(indent + "}\n")
+    subgraph.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
new file mode 100644
index 0000000..2884a49
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+
+/**
+ * A SparkListener that constructs a DAG of RDD operations.
+ */
+private[ui] class RDDOperationGraphListener(conf: SparkConf) extends 
SparkListener {
+  private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
+  private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
+  private val stageIds = new mutable.ArrayBuffer[Int]
+
+  // How many jobs or stages to retain graph metadata for
+  private val retainedStages =
+    conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
+
+  /** Return the graph metadata for the given stage, or None if no such 
information exists. */
+  def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
+    jobIdToStageIds.get(jobId)
+      .map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } }
+      .getOrElse { Seq.empty }
+  }
+
+  /** Return the graph metadata for the given stage, or None if no such 
information exists. */
+  def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
+    stageIdToGraph.get(stageId)
+  }
+
+  /** On job start, construct a RDDOperationGraph for each stage in the job 
for display later. */
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = 
synchronized {
+    val jobId = jobStart.jobId
+    val stageInfos = jobStart.stageInfos
+
+    stageInfos.foreach { stageInfo =>
+      stageIds += stageInfo.stageId
+      stageIdToGraph(stageInfo.stageId) = 
RDDOperationGraph.makeOperationGraph(stageInfo)
+    }
+    jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
+
+    // Remove graph metadata for old stages
+    if (stageIds.size >= retainedStages) {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
+      stageIds.trimStart(toRemove)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to