http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 31c07c7..7f7c7ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,7 +25,7 @@ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus -import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text} +import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.TextOutputFormat @@ -277,12 +277,20 @@ abstract class RDD[T: ClassTag]( if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } + /** + * Execute a block of code in a scope such that all new RDDs created in this body will + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. + * + * Note: Return statements are NOT allowed in the given body. + */ + private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body) + // Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[U: ClassTag](f: T => U): RDD[U] = { + def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } @@ -291,7 +299,7 @@ abstract class RDD[T: ClassTag]( * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = { + def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } @@ -299,7 +307,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing only the elements that satisfy a predicate. */ - def filter(f: T => Boolean): RDD[T] = { + def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, @@ -310,13 +318,16 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = + def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + } /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(): RDD[T] = distinct(partitions.length) + def distinct(): RDD[T] = withScope { + distinct(partitions.length) + } /** * Return a new RDD that has exactly numPartitions partitions. @@ -327,7 +338,7 @@ abstract class RDD[T: ClassTag]( * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ - def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { + def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } @@ -352,7 +363,7 @@ abstract class RDD[T: ClassTag]( * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) - : RDD[T] = { + : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { @@ -377,16 +388,17 @@ abstract class RDD[T: ClassTag]( /** * Return a sampled subset of this RDD. - * + * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */ - def sample(withReplacement: Boolean, + def sample( + withReplacement: Boolean, fraction: Double, - seed: Long = Utils.random.nextLong): RDD[T] = { + seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) @@ -403,7 +415,9 @@ abstract class RDD[T: ClassTag]( * * @return split RDDs in an array */ - def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { + def randomSplit( + weights: Array[Double], + seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => @@ -435,7 +449,9 @@ abstract class RDD[T: ClassTag]( * @param seed seed for the random number generator * @return sample of specified size in an array */ - def takeSample(withReplacement: Boolean, + // TODO: rewrite this without return statements so we can wrap it in a scope + def takeSample( + withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] = { val numStDev = 10.0 @@ -483,7 +499,7 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - def union(other: RDD[T]): RDD[T] = { + def union(other: RDD[T]): RDD[T] = withScope { if (partitioner.isDefined && other.partitioner == partitioner) { new PartitionerAwareUnionRDD(sc, Array(this, other)) } else { @@ -495,7 +511,9 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - def ++(other: RDD[T]): RDD[T] = this.union(other) + def ++(other: RDD[T]): RDD[T] = withScope { + this.union(other) + } /** * Return this RDD sorted by the given key function. @@ -504,10 +522,11 @@ abstract class RDD[T: ClassTag]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) - (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values + } /** * Return the intersection of this RDD and another one. The output will not contain any duplicate @@ -515,7 +534,7 @@ abstract class RDD[T: ClassTag]( * * Note that this method performs a shuffle internally. */ - def intersection(other: RDD[T]): RDD[T] = { + def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys @@ -529,8 +548,9 @@ abstract class RDD[T: ClassTag]( * * @param partitioner Partitioner to use for the resulting RDD */ - def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null) - : RDD[T] = { + def intersection( + other: RDD[T], + partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys @@ -544,16 +564,14 @@ abstract class RDD[T: ClassTag]( * * @param numPartitions How many partitions to use in the resulting RDD */ - def intersection(other: RDD[T], numPartitions: Int): RDD[T] = { - this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions)) - .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } - .keys + def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope { + intersection(other, new HashPartitioner(numPartitions)) } /** * Return an RDD created by coalescing all elements within each partition into an array. */ - def glom(): RDD[Array[T]] = { + def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) } @@ -561,7 +579,9 @@ abstract class RDD[T: ClassTag]( * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ - def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { + new CartesianRDD(sc, this, other) + } /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements @@ -572,8 +592,9 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy[K](f, defaultPartitioner(this)) + } /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements @@ -584,8 +605,11 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K]( + f: T => K, + numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy(f, new HashPartitioner(numPartitions)) + } /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements @@ -597,7 +621,7 @@ abstract class RDD[T: ClassTag]( * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) - : RDD[(K, Iterable[T])] = { + : RDD[(K, Iterable[T])] = withScope { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } @@ -605,13 +629,16 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: String): RDD[String] = new PipedRDD(this, command) + def pipe(command: String): RDD[String] = withScope { + new PipedRDD(this, command) + } /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: String, env: Map[String, String]): RDD[String] = + def pipe(command: String, env: Map[String, String]): RDD[String] = withScope { new PipedRDD(this, command, env) + } /** * Return an RDD created by piping elements to a forked external process. @@ -619,7 +646,7 @@ abstract class RDD[T: ClassTag]( * * @param command command to run in forked process. * @param env environment variables to set. - * @param printPipeContext Before piping elements, this function is called as an oppotunity + * @param printPipeContext Before piping elements, this function is called as an opportunity * to pipe context data. Print line function (like out.println) will be * passed as printPipeContext's parameter. * @param printRDDElement Use this function to customize how to pipe elements. This function @@ -637,7 +664,7 @@ abstract class RDD[T: ClassTag]( env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null, - separateWorkingDir: Boolean = false): RDD[String] = { + separateWorkingDir: Boolean = false): RDD[String] = withScope { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null, @@ -651,7 +678,7 @@ abstract class RDD[T: ClassTag]( * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( - f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -664,7 +691,8 @@ abstract class RDD[T: ClassTag]( * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( - f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -681,7 +709,7 @@ abstract class RDD[T: ClassTag]( @deprecated("use TaskContext.get", "1.2.0") def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = { + preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -692,7 +720,8 @@ abstract class RDD[T: ClassTag]( */ @deprecated("use mapPartitionsWithIndex", "0.7.0") def mapPartitionsWithSplit[U: ClassTag]( - f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = withScope { mapPartitionsWithIndex(f, preservesPartitioning) } @@ -704,7 +733,7 @@ abstract class RDD[T: ClassTag]( @deprecated("use mapPartitionsWithIndex", "1.0.0") def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => U): RDD[U] = { + (f: (T, A) => U): RDD[U] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.map(t => f(t, a)) @@ -719,7 +748,7 @@ abstract class RDD[T: ClassTag]( @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => Seq[U]): RDD[U] = { + (f: (T, A) => Seq[U]): RDD[U] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.flatMap(t => f(t, a)) @@ -732,11 +761,11 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") - def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) { + def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) - }.foreach(_ => {}) + } } /** @@ -745,7 +774,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") - def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { + def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) @@ -758,7 +787,7 @@ abstract class RDD[T: ClassTag]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { + def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { @@ -780,33 +809,39 @@ abstract class RDD[T: ClassTag]( */ def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + } def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) - (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { + zipPartitions(rdd2, preservesPartitioning = false)(f) + } def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) + } def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) - (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false) + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { + zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f) + } def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) + } def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) - (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { + zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f) + } // Actions (launch a job to return a value to the user program) @@ -814,7 +849,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to all elements of this RDD. */ - def foreach(f: T => Unit) { + def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } @@ -822,7 +857,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to each partition of this RDD. */ - def foreachPartition(f: Iterator[T] => Unit) { + def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) } @@ -830,7 +865,7 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. */ - def collect(): Array[T] = { + def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } @@ -840,7 +875,7 @@ abstract class RDD[T: ClassTag]( * * The iterator will consume as much memory as the largest partition in this RDD. */ - def toLocalIterator: Iterator[T] = { + def toLocalIterator: Iterator[T] = withScope { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head } @@ -851,12 +886,14 @@ abstract class RDD[T: ClassTag]( * Return an array that contains all of the elements in this RDD. */ @deprecated("use collect", "1.0.0") - def toArray(): Array[T] = collect() + def toArray(): Array[T] = withScope { + collect() + } /** * Return an RDD that contains all matching values by applying `f`. */ - def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { + def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { filter(f.isDefinedAt).map(f) } @@ -866,19 +903,23 @@ abstract class RDD[T: ClassTag]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtract(other: RDD[T]): RDD[T] = + def subtract(other: RDD[T]): RDD[T] = withScope { subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) + } /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], numPartitions: Int): RDD[T] = + def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope { subtract(other, new HashPartitioner(numPartitions)) + } /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = { + def subtract( + other: RDD[T], + p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples @@ -900,7 +941,7 @@ abstract class RDD[T: ClassTag]( * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ - def reduce(f: (T, T) => T): T = { + def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { @@ -929,7 +970,7 @@ abstract class RDD[T: ClassTag]( * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#reduce]] */ - def treeReduce(f: (T, T) => T, depth: Int = 2): T = { + def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = context.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -961,7 +1002,7 @@ abstract class RDD[T: ClassTag]( * modify t1 and return it as its result value to avoid object allocation; however, it should not * modify t2. */ - def fold(zeroValue: T)(op: (T, T) => T): T = { + def fold(zeroValue: T)(op: (T, T) => T): T = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) @@ -979,7 +1020,7 @@ abstract class RDD[T: ClassTag]( * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ - def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { + def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) @@ -999,26 +1040,29 @@ abstract class RDD[T: ClassTag]( def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, - depth: Int = 2): U = { + depth: Int = 2): U = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (partitions.length == 0) { - return Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) - } - val cleanSeqOp = context.clean(seqOp) - val cleanCombOp = context.clean(combOp) - val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) - var numPartitions = partiallyAggregated.partitions.length - val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) - // If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation. - while (numPartitions > scale + numPartitions / scale) { - numPartitions /= scale - val curNumPartitions = numPartitions - partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => - iter.map((i % curNumPartitions, _)) - }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) + } else { + val cleanSeqOp = context.clean(seqOp) + val cleanCombOp = context.clean(combOp) + val aggregatePartition = + (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) + var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) + var numPartitions = partiallyAggregated.partitions.length + val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) + // If creating an extra level doesn't help reduce + // the wall-clock time, we stop tree aggregation. + while (numPartitions > scale + numPartitions / scale) { + numPartitions /= scale + val curNumPartitions = numPartitions + partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { + (i, iter) => iter.map((i % curNumPartitions, _)) + }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values + } + partiallyAggregated.reduce(cleanCombOp) } - partiallyAggregated.reduce(cleanCombOp) } /** @@ -1032,7 +1076,9 @@ abstract class RDD[T: ClassTag]( * within a timeout, even if not all tasks have finished. */ @Experimental - def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + def countApprox( + timeout: Long, + confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L while (iter.hasNext) { @@ -1053,7 +1099,7 @@ abstract class RDD[T: ClassTag]( * To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = { + def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey() } @@ -1064,8 +1110,7 @@ abstract class RDD[T: ClassTag]( @Experimental def countByValueApprox(timeout: Long, confidence: Double = 0.95) (implicit ord: Ordering[T] = null) - : PartialResult[Map[T, BoundedDouble]] = - { + : PartialResult[Map[T, BoundedDouble]] = withScope { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } @@ -1098,7 +1143,7 @@ abstract class RDD[T: ClassTag]( * If `sp` equals 0, the sparse representation is skipped. */ @Experimental - def countApproxDistinct(p: Int, sp: Int): Long = { + def countApproxDistinct(p: Int, sp: Int): Long = withScope { require(p >= 4, s"p ($p) must be at least 4") require(sp <= 32, s"sp ($sp) cannot be greater than 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") @@ -1124,7 +1169,7 @@ abstract class RDD[T: ClassTag]( * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - def countApproxDistinct(relativeSD: Double = 0.05): Long = { + def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope { val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(p, 0) } @@ -1142,7 +1187,9 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this) + def zipWithIndex(): RDD[(T, Long)] = withScope { + new ZippedWithIndexRDD(this) + } /** * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, @@ -1154,7 +1201,7 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - def zipWithUniqueId(): RDD[(T, Long)] = { + def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => @@ -1171,48 +1218,50 @@ abstract class RDD[T: ClassTag]( * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ - def take(num: Int): Array[T] = { + def take(num: Int): Array[T] = withScope { if (num == 0) { - return new Array[T](0) - } - - val buf = new ArrayBuffer[T] - val totalParts = this.partitions.length - var partsScanned = 0 - while (buf.size < num && partsScanned < totalParts) { - // The number of partitions to try in this iteration. It is ok for this number to be - // greater than totalParts because we actually cap it at totalParts in runJob. - var numPartsToTry = 1 - if (partsScanned > 0) { - // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise, - // interpolate the number of partitions we need to try, but overestimate it by 50%. - // We also cap the estimation in the end. - if (buf.size == 0) { - numPartsToTry = partsScanned * 4 - } else { - // the left side of max is >=1 whenever partsScanned >= 2 - numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) - numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) + new Array[T](0) + } else { + val buf = new ArrayBuffer[T] + val totalParts = this.partitions.length + var partsScanned = 0 + while (buf.size < num && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1 + if (partsScanned > 0) { + // If we didn't find any rows after the previous iteration, quadruple and retry. + // Otherwise, interpolate the number of partitions we need to try, but overestimate + // it by 50%. We also cap the estimation in the end. + if (buf.size == 0) { + numPartsToTry = partsScanned * 4 + } else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) + } } - } - val left = num - buf.size - val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) - val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) + val left = num - buf.size + val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) - res.foreach(buf ++= _.take(num - buf.size)) - partsScanned += numPartsToTry - } + res.foreach(buf ++= _.take(num - buf.size)) + partsScanned += numPartsToTry + } - buf.toArray + buf.toArray + } } /** * Return the first element in this RDD. */ - def first(): T = take(1) match { - case Array(t) => t - case _ => throw new UnsupportedOperationException("empty collection") + def first(): T = withScope { + take(1) match { + case Array(t) => t + case _ => throw new UnsupportedOperationException("empty collection") + } } /** @@ -1230,7 +1279,9 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse) + def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { + takeOrdered(num)(ord.reverse) + } /** * Returns the first k (smallest) elements from this RDD as defined by the specified @@ -1248,7 +1299,7 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { @@ -1273,13 +1324,17 @@ abstract class RDD[T: ClassTag]( * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ - def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max) + def max()(implicit ord: Ordering[T]): T = withScope { + this.reduce(ord.max) + } /** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the minimum element of the RDD * */ - def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min) + def min()(implicit ord: Ordering[T]): T = withScope { + this.reduce(ord.min) + } /** * @note due to complications in the internal implementation, this method will raise an @@ -1289,12 +1344,14 @@ abstract class RDD[T: ClassTag]( * @return true if and only if the RDD contains no elements at all. Note that an RDD * may be empty even when it has at least 1 partition. */ - def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0 + def isEmpty(): Boolean = withScope { + partitions.length == 0 || take(1).length == 0 + } /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String) { + def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1321,7 +1378,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a compressed text file, using string representations of elements. */ - def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { + def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] @@ -1339,7 +1396,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String) { + def saveAsObjectFile(path: String): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) @@ -1348,12 +1405,12 @@ abstract class RDD[T: ClassTag]( /** * Creates tuples of the elements in this RDD by applying `f`. */ - def keyBy[K](f: T => K): RDD[(K, T)] = { + def keyBy[K](f: T => K): RDD[(K, T)] = withScope { map(x => (f(x), x)) } /** A private method for tests, to look at the contents of each partition */ - private[spark] def collectPartitions(): Array[Array[T]] = { + private[spark] def collectPartitions(): Array[Array[T]] = withScope { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } @@ -1392,6 +1449,17 @@ abstract class RDD[T: ClassTag]( /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = sc.getCallSite() + /** + * The scope associated with the operation that created this RDD. + * + * This is more flexible than the call site and can be defined hierarchically. For more + * detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the + * user instantiates this RDD himself without using any Spark operations. + */ + @transient private[spark] val scope: Option[RDDOperationScope] = { + Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson) + } + private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1470,7 +1538,7 @@ abstract class RDD[T: ClassTag]( /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString: String = { // Get a debug description of an rdd without its children - def debugSelf (rdd: RDD[_]): Seq[String] = { + def debugSelf(rdd: RDD[_]): Seq[String] = { import Utils.bytesToString val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" @@ -1527,10 +1595,11 @@ abstract class RDD[T: ClassTag]( case (desc: String, _) => s"$nextPrefix$desc" } ++ debugChildren(rdd, nextPrefix) } - def debugString(rdd: RDD[_], - prefix: String = "", - isShuffle: Boolean = true, - isLastChild: Boolean = false): Seq[String] = { + def debugString( + rdd: RDD[_], + prefix: String = "", + isShuffle: Boolean = true, + isLastChild: Boolean = false): Seq[String] = { if (isShuffle) { shuffleDebugString(rdd, prefix, isLastChild) } else {
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala new file mode 100644 index 0000000..537b56b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.util.concurrent.atomic.AtomicInteger + +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOrder} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.SparkContext + +/** + * A general, named code block representing an operation that instantiates RDDs. + * + * All RDDs instantiated in the corresponding code block will store a pointer to this object. + * Examples include, but will not be limited to, existing RDD operations, such as textFile, + * reduceByKey, and treeAggregate. + * + * An operation scope may be nested in other scopes. For instance, a SQL query may enclose + * scopes associated with the public RDD APIs it uses under the hood. + * + * There is no particular relationship between an operation scope and a stage or a job. + * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take). + */ +@JsonInclude(Include.NON_NULL) +@JsonPropertyOrder(Array("id", "name", "parent")) +private[spark] class RDDOperationScope( + val name: String, + val parent: Option[RDDOperationScope] = None) { + + val id: Int = RDDOperationScope.nextScopeId() + + def toJson: String = { + RDDOperationScope.jsonMapper.writeValueAsString(this) + } + + /** + * Return a list of scopes that this scope is a part of, including this scope itself. + * The result is ordered from the outermost scope (eldest ancestor) to this scope. + */ + @JsonIgnore + def getAllScopes: Seq[RDDOperationScope] = { + parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this) + } + + override def equals(other: Any): Boolean = { + other match { + case s: RDDOperationScope => + id == s.id && name == s.name && parent == s.parent + case _ => false + } + } + + override def toString: String = toJson +} + +/** + * A collection of utility methods to construct a hierarchical representation of RDD scopes. + * An RDD scope tracks the series of operations that created a given RDD. + */ +private[spark] object RDDOperationScope { + private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private val scopeCounter = new AtomicInteger(0) + + def fromJson(s: String): RDDOperationScope = { + jsonMapper.readValue(s, classOf[RDDOperationScope]) + } + + /** Return a globally unique operation scope ID. */ + def nextScopeId(): Int = scopeCounter.getAndIncrement + + /** + * Execute the given body such that all RDDs created in this body will have the same scope. + * The name of the scope will be the name of the method that immediately encloses this one. + * + * Note: Return statements are NOT allowed in body. + */ + private[spark] def withScope[T]( + sc: SparkContext, + allowNesting: Boolean = false)(body: => T): T = { + val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName + withScope[T](sc, callerMethodName, allowNesting)(body) + } + + /** + * Execute the given body such that all RDDs created in this body will have the same scope. + * + * If nesting is allowed, this concatenates the previous scope with the new one in a way that + * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to + * this method executed in the body will have no effect. + * + * Note: Return statements are NOT allowed in body. + */ + private[spark] def withScope[T]( + sc: SparkContext, + name: String, + allowNesting: Boolean = false)(body: => T): T = { + // Save the old scope to restore it later + val scopeKey = SparkContext.RDD_SCOPE_KEY + val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY + val oldScopeJson = sc.getLocalProperty(scopeKey) + val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) + val oldNoOverride = sc.getLocalProperty(noOverrideKey) + try { + // Set the scope only if the higher level caller allows us to do so + if (sc.getLocalProperty(noOverrideKey) == null) { + sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) + } + // Optionally disallow the child body to override our scope + if (!allowNesting) { + sc.setLocalProperty(noOverrideKey, "true") + } + body + } finally { + // Remember to restore any state that was modified before exiting + sc.setLocalProperty(scopeKey, oldScopeJson) + sc.setLocalProperty(noOverrideKey, oldNoOverride) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 059f896..3dfcf67 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -85,7 +85,9 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported * file system. */ - def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) { + def saveAsSequenceFile( + path: String, + codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index cf3db0b..e439d2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -33,6 +33,7 @@ class StageInfo( val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], + val parentIds: Seq[Int], val details: String) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -78,6 +79,7 @@ private[spark] object StageInfo { stage.name, numTasks.getOrElse(stage.numTasks), rddInfos, + stage.parents.map(_.id), stage.details) } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index ad53a3e..9606262 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDDOperationScope, RDD} import org.apache.spark.util.Utils @DeveloperApi @@ -26,7 +26,9 @@ class RDDInfo( val id: Int, val name: String, val numPartitions: Int, - var storageLevel: StorageLevel) + var storageLevel: StorageLevel, + val parentIds: Seq[Int], + val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { var numCachedPartitions = 0 @@ -52,7 +54,8 @@ class RDDInfo( private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { - val rddName = Option(rdd.name).getOrElse(rdd.id.toString) - new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel) + val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) + val parentIds = rdd.dependencies.map(_.rdd.id) + new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 06fce86..a5271f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,6 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} +import org.apache.spark.ui.scope.RDDOperationGraphListener /** * Top level user interface for a Spark application. @@ -38,6 +39,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, + val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -93,6 +95,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" + val DEFAULT_POOL_NAME = "default" + val DEFAULT_RETAINED_STAGES = 1000 + val DEFAULT_RETAINED_JOBS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) @@ -144,13 +149,16 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) + val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) + listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, appName, basePath) + executorsListener, _jobProgressListener, storageListener, operationGraphListener, + appName, basePath) } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/UIUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 395af2e..2f3fb18 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -23,6 +23,7 @@ import java.util.{Locale, Date} import scala.xml.{Node, Text} import org.apache.spark.Logging +import org.apache.spark.ui.scope.RDDOperationGraph /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils extends Logging { @@ -172,13 +173,21 @@ private[spark] object UIUtils extends Logging { <script src={prependBaseUri("/static/timeline-view.js")}></script> } + def vizHeaderNodes: Seq[Node] = { + <script src={prependBaseUri("/static/d3.min.js")}></script> + <script src={prependBaseUri("/static/dagre-d3.min.js")}></script> + <script src={prependBaseUri("/static/graphlib-dot.min.js")}></script> + <script src={prependBaseUri("/static/spark-dag-viz.js")}></script> + } + /** Returns a spark page with correctly formatted headers */ def headerSparkPage( title: String, content: => Seq[Node], activeTab: SparkUITab, refreshInterval: Option[Int] = None, - helpText: Option[String] = None): Seq[Node] = { + helpText: Option[String] = None, + showVisualization: Boolean = false): Seq[Node] = { val appName = activeTab.appName val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..." @@ -196,6 +205,7 @@ private[spark] object UIUtils extends Logging { <html> <head> {commonHeaderNodes} + {if (showVisualization) vizHeaderNodes else Seq.empty} <title>{appName} - {title}</title> </head> <body> @@ -320,4 +330,47 @@ private[spark] object UIUtils extends Logging { <div class="bar bar-running" style={startWidth}></div> </div> } + + /** Return a "DAG visualization" DOM element that expands into a visualization for a stage. */ + def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): Seq[Node] = { + showDagViz(graph.toSeq, forJob = false) + } + + /** Return a "DAG visualization" DOM element that expands into a visualization for a job. */ + def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = { + showDagViz(graphs, forJob = true) + } + + /** + * Return a "DAG visualization" DOM element that expands into a visualization on the UI. + * + * This populates metadata necessary for generating the visualization on the front-end in + * a format that is expected by spark-dag-viz.js. Any changes in the format here must be + * reflected there. + */ + private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = { + <div> + <span class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}> + <span class="expand-dag-viz-arrow arrow-closed"></span> + <strong>DAG visualization</strong> + </span> + <div id="dag-viz-graph"></div> + <div id="dag-viz-metadata"> + { + graphs.map { g => + <div class="stage-metadata" stageId={g.rootCluster.id} style="display:none"> + <div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div> + { g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } } + { g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } } + { + g.rootCluster.getAllNodes.filter(_.cached).map { n => + <div class="cached-rdd">{n.id}</div> + } + } + </div> + } + } + </div> + </div> + } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index a7ea12b..f6abf27 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -179,7 +179,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { <span class="expand-application-timeline"> <span class="expand-application-timeline-arrow arrow-closed"></span> - <strong>Event Timeline</strong> + <strong>Event timeline</strong> </span> ++ <div id="application-timeline" class="collapsed"> <div class="control-panel"> http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 527f960..236bc8e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc - private val listener = parent.listener + private val listener = parent.progressListener private def isFairScheduler = parent.isFairScheduler def render(request: HttpServletRequest): Seq[Node] = { @@ -42,18 +42,18 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) val pendingStagesTable = new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) + parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val failedStagesTable = new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.listener, isFairScheduler = parent.isFairScheduler) + parent.progressListener, isFairScheduler = parent.isFairScheduler) // For now, pool information is only accessible in live UIs val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 1f8536d..d5cdbfa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils /** Stage summary grouped by executors. */ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) { - private val listener = parent.listener + private val listener = parent.progressListener def toNodeSeq: Seq[Node] = { listener.synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index dd968e1..96cc3d7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -31,6 +31,7 @@ import org.apache.spark.ui.jobs.UIData.ExecutorUIData /** Page showing statistics and stage list for a given job */ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { + private val STAGES_LEGEND = <div class="legend-area"><svg width="150px" height="85px"> <rect class="completed-stage-legend" @@ -133,7 +134,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { events.toSeq } - private def makeTimeline( + private def makeTimeline( stages: Seq[StageInfo], executors: HashMap[String, ExecutorUIData], appStartTime: Long): Seq[Node] = { @@ -160,7 +161,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { <span class="expand-job-timeline"> <span class="expand-job-timeline-arrow arrow-closed"></span> - <strong>Event Timeline</strong> + <strong>Event timeline</strong> </span> ++ <div id="job-timeline" class="collapsed"> <div class="control-panel"> @@ -198,7 +199,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown")) + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) } val activeStages = Buffer[StageInfo]() @@ -303,9 +304,14 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { var content = summary val appStartTime = listener.startTime val executorListener = parent.executorListener + val operationGraphListener = parent.operationGraphListener + content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, executorListener.executorIdToData, appStartTime) + content ++= UIUtils.showDagVizForJob( + jobId, operationGraphListener.getOperationGraphForJob(jobId)) + if (shouldShowActiveStages) { content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq @@ -326,7 +332,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++ failedStagesTable.toNodeSeq } - UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent) + UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index d6d716d..8f9aa9f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData._ /** @@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._ @DeveloperApi class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { - import JobProgressListener._ - // Define a handful of type aliases so that data structures' types can serve as documentation. // These type aliases are public because they're used in the types of public fields: @@ -86,8 +85,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // To limit the total memory usage of JobProgressListener, we only track information for a fixed // number of non-active jobs and stages (there is no limit for active jobs and stages): - val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) // We can test for memory leaks by ensuring that collections that track non-active jobs and // stages do not grow without bound and that collections for active jobs/stages eventually become @@ -288,8 +287,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { activeStages(stage.stageId) = stage pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) - }.getOrElse(DEFAULT_POOL_NAME) + p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) @@ -524,9 +523,3 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { startTime = appStarted.time } } - -private object JobProgressListener { - val DEFAULT_POOL_NAME = "default" - val DEFAULT_RETAINED_STAGES = 1000 - val DEFAULT_RETAINED_JOBS = 1000 -} http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 342787f..77ca60b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -24,10 +24,12 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { val sc = parent.sc val killEnabled = parent.killEnabled - def isFairScheduler: Boolean = - jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR) val jobProgresslistener = parent.jobProgressListener val executorListener = parent.executorsListener + val operationGraphListener = parent.operationGraphListener + + def isFairScheduler: Boolean = + jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR) attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index f47cdc9..d725b9d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { private val sc = parent.sc - private val listener = parent.listener + private val listener = parent.progressListener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { @@ -40,7 +40,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { case None => Seq[StageInfo]() } val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) // For now, pool information is only accessible in live UIs http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index df1899e..9ba2af5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.UIUtils /** Table showing list of pools */ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { - private val listener = parent.listener + private val listener = parent.progressListener def toNodeSeq: Seq[Node] = { listener.synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 797c940..5793100 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -27,15 +27,17 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ +import org.apache.spark.ui.scope.RDDOperationGraph import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { - private val listener = parent.listener + private val progressListener = parent.progressListener + private val operationGraphListener = parent.operationGraphListener def render(request: HttpServletRequest): Seq[Node] = { - listener.synchronized { + progressListener.synchronized { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") @@ -44,7 +46,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val stageId = parameterId.toInt val stageAttemptId = parameterAttempt.toInt - val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId)) + val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId)) if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content = @@ -60,7 +62,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables + val accumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables val hasAccumulators = accumulables.size > 0 val summary = @@ -169,6 +171,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { </div> </div> + val dagViz = UIUtils.showDagVizForStage( + stageId, operationGraphListener.getOperationGraphForStage(stageId)) + val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") def accumulableRow(acc: AccumulableInfo): Elem = <tr><td>{acc.name}</td><td>{acc.value}</td></tr> @@ -434,13 +439,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val content = summary ++ showAdditionalMetrics ++ + dagViz ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++ maybeAccumulableTable ++ <h4>Tasks</h4> ++ taskTable - UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent) + UIUtils.headerSparkPage( + "Details for Stage %d".format(stageId), content, parent, showVisualization = true) } } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 1bd2d87..5516995 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -26,19 +26,20 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") { val sc = parent.sc val killEnabled = parent.killEnabled - val listener = parent.jobProgressListener + val progressListener = parent.jobProgressListener + val operationGraphListener = parent.operationGraphListener attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) attachPage(new PoolPage(this)) - def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) + def isFairScheduler: Boolean = progressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt - if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { + if (stageId >= 0 && killFlag && progressListener.activeStages.contains(stageId)) { sc.get.cancelStage(stageId) } // Do a quick pause here to give Spark time to kill the stage so it shows up as http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala new file mode 100644 index 0000000..a18c193 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.scope + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +import org.apache.spark.Logging +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.StorageLevel + +/** + * A representation of a generic cluster graph used for storing information on RDD operations. + * + * Each graph is defined with a set of edges and a root cluster, which may contain children + * nodes and children clusters. Additionally, a graph may also have edges that enter or exit + * the graph from nodes that belong to adjacent graphs. + */ +private[ui] case class RDDOperationGraph( + edges: Seq[RDDOperationEdge], + outgoingEdges: Seq[RDDOperationEdge], + incomingEdges: Seq[RDDOperationEdge], + rootCluster: RDDOperationCluster) + +/** A node in an RDDOperationGraph. This represents an RDD. */ +private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean) + +/** + * A directed edge connecting two nodes in an RDDOperationGraph. + * This represents an RDD dependency. + */ +private[ui] case class RDDOperationEdge(fromId: Int, toId: Int) + +/** + * A cluster that groups nodes together in an RDDOperationGraph. + * + * This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap), + * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters. + */ +private[ui] class RDDOperationCluster(val id: String, val name: String) { + private val _childNodes = new ListBuffer[RDDOperationNode] + private val _childClusters = new ListBuffer[RDDOperationCluster] + + def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq + def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq + def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode } + def attachChildCluster(childCluster: RDDOperationCluster): Unit = { + _childClusters += childCluster + } + + /** Return all the nodes container in this cluster, including ones nested in other clusters. */ + def getAllNodes: Seq[RDDOperationNode] = { + _childNodes ++ _childClusters.flatMap(_.childNodes) + } +} + +private[ui] object RDDOperationGraph extends Logging { + + /** + * Construct a RDDOperationGraph for a given stage. + * + * The root cluster represents the stage, and all children clusters represent RDD operations. + * Each node represents an RDD, and each edge represents a dependency between two RDDs pointing + * from the parent to the child. + * + * This does not currently merge common operation scopes across stages. This may be worth + * supporting in the future if we decide to group certain stages within the same job under + * a common scope (e.g. part of a SQL query). + */ + def makeOperationGraph(stage: StageInfo): RDDOperationGraph = { + val edges = new ListBuffer[RDDOperationEdge] + val nodes = new mutable.HashMap[Int, RDDOperationNode] + val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID + + // Root cluster is the stage cluster + val stageClusterId = s"stage_${stage.stageId}" + val stageClusterName = s"Stage ${stage.stageId}" + + { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } + val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) + + // Find nodes, edges, and operation scopes that belong to this stage + stage.rddInfos.foreach { rdd => + edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } + + // TODO: differentiate between the intention to cache an RDD and whether it's actually cached + val node = nodes.getOrElseUpdate( + rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE)) + + if (rdd.scope == null) { + // This RDD has no encompassing scope, so we put it directly in the root cluster + // This should happen only if an RDD is instantiated outside of a public RDD API + rootCluster.attachChildNode(node) + } else { + // Otherwise, this RDD belongs to an inner cluster, + // which may be nested inside of other clusters + val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty) + val rddClusters = rddScopes.map { scope => + val clusterId = scope.name + "_" + scope.id + val clusterName = scope.name + clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName)) + } + // Build the cluster hierarchy for this RDD + rddClusters.sliding(2).foreach { pc => + if (pc.size == 2) { + val parentCluster = pc(0) + val childCluster = pc(1) + parentCluster.attachChildCluster(childCluster) + } + } + // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster + rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) } + rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } + } + } + + // Classify each edge as internal, outgoing or incoming + // This information is needed to reason about how stages relate to each other + val internalEdges = new ListBuffer[RDDOperationEdge] + val outgoingEdges = new ListBuffer[RDDOperationEdge] + val incomingEdges = new ListBuffer[RDDOperationEdge] + edges.foreach { case e: RDDOperationEdge => + val fromThisGraph = nodes.contains(e.fromId) + val toThisGraph = nodes.contains(e.toId) + (fromThisGraph, toThisGraph) match { + case (true, true) => internalEdges += e + case (true, false) => outgoingEdges += e + case (false, true) => incomingEdges += e + // should never happen + case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: $e") + } + } + + RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster) + } + + /** + * Generate the content of a dot file that describes the specified graph. + * + * Note that this only uses a minimal subset of features available to the DOT specification. + * Part of the styling must be done here because the rendering library must take certain + * attributes into account when arranging the graph elements. More style is added in the + * visualization later through post-processing in JavaScript. + * + * For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf. + */ + def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = { + val dotFile = new StringBuilder + dotFile.append("digraph G {\n") + dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " ")) + graph.edges.foreach { edge => + dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""") + } + dotFile.append("}") + val result = dotFile.toString() + logDebug(result) + result + } + + /** + * Return the dot representation of a node in an RDDOperationGraph. + * + * On the job page, is displayed as a small circle without labels. + * On the stage page, it is displayed as a box with an embedded label. + */ + private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = { + if (forJob) { + s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]""" + } else { + s"""${node.id} [label="${node.name} (${node.id})"]""" + } + } + + /** Return the dot representation of a subgraph in an RDDOperationGraph. */ + private def makeDotSubgraph( + scope: RDDOperationCluster, + forJob: Boolean, + indent: String): String = { + val subgraph = new StringBuilder + subgraph.append(indent + s"subgraph cluster${scope.id} {\n") + subgraph.append(indent + s""" label="${scope.name}";\n""") + scope.childNodes.foreach { node => + subgraph.append(indent + s" ${makeDotNode(node, forJob)};\n") + } + scope.childClusters.foreach { cscope => + subgraph.append(makeDotSubgraph(cscope, forJob, indent + " ")) + } + subgraph.append(indent + "}\n") + subgraph.toString() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala new file mode 100644 index 0000000..2884a49 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.scope + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI + +/** + * A SparkListener that constructs a DAG of RDD operations. + */ +private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener { + private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]] + private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph] + private val stageIds = new mutable.ArrayBuffer[Int] + + // How many jobs or stages to retain graph metadata for + private val retainedStages = + conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) + + /** Return the graph metadata for the given stage, or None if no such information exists. */ + def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = { + jobIdToStageIds.get(jobId) + .map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } } + .getOrElse { Seq.empty } + } + + /** Return the graph metadata for the given stage, or None if no such information exists. */ + def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = { + stageIdToGraph.get(stageId) + } + + /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */ + override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { + val jobId = jobStart.jobId + val stageInfos = jobStart.stageInfos + + stageInfos.foreach { stageInfo => + stageIds += stageInfo.stageId + stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) + } + jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted + + // Remove graph metadata for old stages + if (stageIds.size >= retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } + stageIds.trimStart(toRemove) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org