http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js new file mode 100644 index 0000000..99b0294 --- /dev/null +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file contains the logic to render the RDD DAG visualization in the UI. + * + * This DAG describes the relationships between + * (1) an RDD and its dependencies, + * (2) an RDD and its operation scopes, and + * (3) an RDD's operation scopes and the stage / job hierarchy + * + * An operation scope is a general, named code block representing an operation + * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation + * scope can be nested inside of other scopes if the corresponding RDD operation + * invokes other such operations (for more detail, see o.a.s.rdd.operationScope). + * + * A stage may include one or more operation scopes if the RDD operations are + * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)). + * On the flip side, an operation scope may also include one or many stages, + * or even jobs if the RDD operation is higher level than Spark's scheduling + * primitives (e.g. take, any SQL query). + * + * In the visualization, an RDD is expressed as a node, and its dependencies + * as directed edges (from parent to child). operation scopes, stages, and + * jobs are expressed as clusters that may contain one or many nodes. These + * clusters may be nested inside of each other in the scenarios described + * above. + * + * The visualization is rendered in an SVG contained in "div#dag-viz-graph", + * and its input data is expected to be populated in "div#dag-viz-metadata" + * by Spark's UI code. This is currently used only on the stage page and on + * the job page. + * + * This requires jQuery, d3, and dagre-d3. Note that we use a custom release + * of dagre-d3 (http://github.com/andrewor14/dagre-d3) for some specific + * functionality. For more detail, please track the changes in that project + * since it was forked (commit 101503833a8ce5fe369547f6addf3e71172ce10b). + */ + +var VizConstants = { + rddColor: "#444444", + rddCachedColor: "#FF0000", + rddOperationColor: "#AADFFF", + stageColor: "#FFDDEE", + clusterLabelColor: "#888888", + edgeColor: "#444444", + edgeWidth: "1.5px", + svgMarginX: 0, + svgMarginY: 20, + stageSep: 50, + graphPrefix: "graph_", + nodePrefix: "node_", + stagePrefix: "stage_", + clusterPrefix: "cluster_", + stageClusterPrefix: "cluster_stage_" +}; + +// Helper d3 accessors for the elements that contain our graph and its metadata +function graphContainer() { return d3.select("#dag-viz-graph"); } +function metadataContainer() { return d3.select("#dag-viz-metadata"); } + +/* + * Show or hide the RDD DAG visualization. + * The graph is only rendered the first time this is called. + */ +function toggleDagViz(forJob) { + var arrowSelector = ".expand-dag-viz-arrow"; + $(arrowSelector).toggleClass('arrow-closed'); + $(arrowSelector).toggleClass('arrow-open'); + var shouldShow = $(arrowSelector).hasClass("arrow-open"); + if (shouldShow) { + var shouldRender = graphContainer().select("svg").empty(); + if (shouldRender) { + renderDagViz(forJob); + } + graphContainer().style("display", "block"); + } else { + // Save the graph for later so we don't have to render it again + graphContainer().style("display", "none"); + } +} + +/* + * Render the RDD DAG visualization. + * + * Input DOM hierarchy: + * div#dag-viz-metadata > + * div.stage-metadata > + * div.[dot-file | incoming-edge | outgoing-edge] + * + * Output DOM hierarchy: + * div#dag-viz-graph > + * svg > + * g#cluster_stage_[stageId] + * + * Note that the input metadata is populated by o.a.s.ui.UIUtils.showDagViz. + * Any changes in the input format here must be reflected there. + */ +function renderDagViz(forJob) { + + // If there is not a dot file to render, fail fast and report error + if (metadataContainer().empty()) { + graphContainer().append("div").text( + "No visualization information available for this " + (forJob ? "job" : "stage")); + return; + } + + var svg = graphContainer().append("svg"); + if (forJob) { + renderDagVizForJob(svg); + } else { + renderDagVizForStage(svg); + } + + // Find cached RDDs + metadataContainer().selectAll(".cached-rdd").each(function(v) { + var nodeId = VizConstants.nodePrefix + d3.select(this).text(); + graphContainer().selectAll("#" + nodeId).classed("cached", true); + }); + + // Set the appropriate SVG dimensions to ensure that all elements are displayed + var boundingBox = svg.node().getBBox(); + svg.style("width", (boundingBox.width + VizConstants.svgMarginX) + "px"); + svg.style("height", (boundingBox.height + VizConstants.svgMarginY) + "px"); + + // Add labels to clusters because dagre-d3 doesn't do this for us + svg.selectAll("g.cluster rect").each(function() { + var rect = d3.select(this); + var cluster = d3.select(this.parentNode); + // Shift the boxes up a little to make room for the labels + rect.attr("y", toFloat(rect.attr("y")) - 10); + rect.attr("height", toFloat(rect.attr("height")) + 10); + var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - 5; + var labelY = toFloat(rect.attr("y")) + 15; + var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, ""); + cluster.append("text") + .attr("x", labelX) + .attr("y", labelY) + .attr("text-anchor", "end") + .text(labelText); + }); + + // We have shifted a few elements upwards, so we should fix the SVG views + var startX = -VizConstants.svgMarginX; + var startY = -VizConstants.svgMarginY; + var endX = toFloat(svg.style("width")) + VizConstants.svgMarginX; + var endY = toFloat(svg.style("height")) + VizConstants.svgMarginY; + var newViewBox = startX + " " + startY + " " + endX + " " + endY; + svg.attr("viewBox", newViewBox); + + // Lastly, apply some custom style to the DAG + styleDagViz(forJob); +} + +/* Render the RDD DAG visualization for a stage. */ +function renderDagVizForStage(svgContainer) { + var metadata = metadataContainer().select(".stage-metadata"); + var dot = metadata.select(".dot-file").text(); + var containerId = VizConstants.graphPrefix + metadata.attr("stageId"); + var container = svgContainer.append("g").attr("id", containerId); + renderDot(dot, container); +} + +/* + * Render the RDD DAG visualization for a job. + * + * Due to limitations in dagre-d3, each stage is rendered independently so that + * we have more control on how to position them. Unfortunately, this means we + * cannot rely on dagre-d3 to render edges that cross stages and must render + * these manually on our own. + */ +function renderDagVizForJob(svgContainer) { + var crossStageEdges = []; + + metadataContainer().selectAll(".stage-metadata").each(function(d, i) { + var metadata = d3.select(this); + var dot = metadata.select(".dot-file").text(); + var stageId = metadata.attr("stageId"); + var containerId = VizConstants.graphPrefix + stageId; + // TODO: handle stage attempts + var stageLink = + "/stages/stage/?id=" + stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0"; + var container = svgContainer + .append("a").attr("xlink:href", stageLink) + .append("g").attr("id", containerId); + // Now we need to shift the container for this stage so it doesn't overlap + // with existing ones. We do not need to do this for the first stage. + if (i > 0) { + // Take into account the position and width of the last stage's container + var existingStages = stageClusters(); + if (!existingStages.empty()) { + var lastStage = existingStages[0].pop(); + var lastStageId = d3.select(lastStage).attr("id"); + var lastStageWidth = toFloat(d3.select("#" + lastStageId + " rect").attr("width")); + var lastStagePosition = getAbsolutePosition(lastStageId); + var offset = lastStagePosition.x + lastStageWidth + VizConstants.stageSep; + container.attr("transform", "translate(" + offset + ", 0)"); + } + } + renderDot(dot, container); + // If there are any incoming edges into this graph, keep track of them to render + // them separately later. Note that we cannot draw them now because we need to + // put these edges in a separate container that is on top of all stage graphs. + metadata.selectAll(".incoming-edge").each(function(v) { + var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4] + crossStageEdges.push(edge); + }); + }); + + // Draw edges that cross stages + if (crossStageEdges.length > 0) { + var container = svgContainer.append("g").attr("id", "cross-stage-edges"); + for (var i = 0; i < crossStageEdges.length; i++) { + var fromRDDId = crossStageEdges[i][0]; + var toRDDId = crossStageEdges[i][1]; + connectRDDs(fromRDDId, toRDDId, container); + } + } +} + +/* Render the dot file as an SVG in the given container. */ +function renderDot(dot, container) { + var escaped_dot = dot + .replace(/</g, "<") + .replace(/>/g, ">") + .replace(/"/g, "\""); + var g = graphlibDot.read(escaped_dot); + var renderer = new dagreD3.render(); + renderer(container, g); +} + +/* Style the visualization we just rendered. */ +function styleDagViz(forJob) { + graphContainer().selectAll("svg g.cluster rect") + .style("fill", "white") + .style("stroke", VizConstants.rddOperationColor) + .style("stroke-width", "4px") + .style("stroke-opacity", "0.5"); + graphContainer().selectAll("svg g.cluster text") + .attr("fill", VizConstants.clusterLabelColor) + .attr("font-size", "11px"); + graphContainer().selectAll("svg path") + .style("stroke", VizConstants.edgeColor) + .style("stroke-width", VizConstants.edgeWidth); + stageClusters() + .select("rect") + .style("stroke", VizConstants.stageColor) + .style("strokeWidth", "6px"); + + // Put an arrow at the end of every edge + // We need to do this because we manually render some edges ourselves + // For these edges, we borrow the arrow marker generated by dagre-d3 + var dagreD3Marker = graphContainer().select("svg g.edgePaths marker").node(); + graphContainer().select("svg") + .append(function() { return dagreD3Marker.cloneNode(true); }) + .attr("id", "marker-arrow") + .select("path") + .attr("fill", VizConstants.edgeColor) + .attr("strokeWidth", "0px"); + graphContainer().selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)"); + graphContainer().selectAll("svg g.edgePaths def").remove(); // We no longer need these + + // Apply any job or stage specific styles + if (forJob) { + styleDagVizForJob(); + } else { + styleDagVizForStage(); + } +} + +/* Apply job-page-specific style to the visualization. */ +function styleDagVizForJob() { + graphContainer().selectAll("svg g.node circle") + .style("fill", VizConstants.rddColor); + // TODO: add a legend to explain what a highlighted dot means + graphContainer().selectAll("svg g.cached circle") + .style("fill", VizConstants.rddCachedColor); + graphContainer().selectAll("svg g#cross-stage-edges path") + .style("fill", "none"); +} + +/* Apply stage-page-specific style to the visualization. */ +function styleDagVizForStage() { + graphContainer().selectAll("svg g.node rect") + .style("fill", "none") + .style("stroke", VizConstants.rddColor) + .style("stroke-width", "2px") + .attr("rx", "5") // round corners + .attr("ry", "5"); + // TODO: add a legend to explain what a highlighted RDD means + graphContainer().selectAll("svg g.cached rect") + .style("stroke", VizConstants.rddCachedColor); + graphContainer().selectAll("svg g.node g.label text tspan") + .style("fill", VizConstants.rddColor); +} + +/* + * (Job page only) Helper method to compute the absolute + * position of the group element identified by the given ID. + */ +function getAbsolutePosition(groupId) { + var obj = d3.select("#" + groupId).filter("g"); + var _x = 0, _y = 0; + while (!obj.empty()) { + var transformText = obj.attr("transform"); + var translate = d3.transform(transformText).translate + _x += translate[0]; + _y += translate[1]; + obj = d3.select(obj.node().parentNode).filter("g") + } + return { x: _x, y: _y }; +} + +/* (Job page only) Connect two RDD nodes with a curved edge. */ +function connectRDDs(fromRDDId, toRDDId, container) { + var fromNodeId = VizConstants.nodePrefix + fromRDDId; + var toNodeId = VizConstants.nodePrefix + toRDDId + var fromPos = getAbsolutePosition(fromNodeId); + var toPos = getAbsolutePosition(toNodeId); + + // On the job page, RDDs are rendered as dots (circles). When rendering the path, + // we need to account for the radii of these circles. Otherwise the arrow heads + // will bleed into the circle itself. + var delta = toFloat(graphContainer() + .select("g.node#" + toNodeId) + .select("circle") + .attr("r")); + if (fromPos.x < toPos.x) { + fromPos.x += delta; + toPos.x -= delta; + } else if (fromPos.x > toPos.x) { + fromPos.x -= delta; + toPos.x += delta; + } + + if (fromPos.y == toPos.y) { + // If they are on the same rank, curve the middle part of the edge + // upward a little to avoid interference with things in between + // e.g. _______ + // _____/ \_____ + var points = [ + [fromPos.x, fromPos.y], + [fromPos.x + (toPos.x - fromPos.x) * 0.2, fromPos.y], + [fromPos.x + (toPos.x - fromPos.x) * 0.3, fromPos.y - 20], + [fromPos.x + (toPos.x - fromPos.x) * 0.7, fromPos.y - 20], + [fromPos.x + (toPos.x - fromPos.x) * 0.8, toPos.y], + [toPos.x, toPos.y] + ]; + } else { + // Otherwise, draw a curved edge that flattens out on both ends + // e.g. _____ + // / + // | + // _____/ + var points = [ + [fromPos.x, fromPos.y], + [fromPos.x + (toPos.x - fromPos.x) * 0.4, fromPos.y], + [fromPos.x + (toPos.x - fromPos.x) * 0.6, toPos.y], + [toPos.x, toPos.y] + ]; + } + + var line = d3.svg.line().interpolate("basis"); + container.append("path").datum(points).attr("d", line); +} + +/* Helper d3 accessor to clusters that represent stages. */ +function stageClusters() { + return graphContainer().selectAll("g.cluster").filter(function() { + return d3.select(this).attr("id").indexOf(VizConstants.stageClusterPrefix) > -1; + }); +} + +/* Helper method to convert attributes to numeric values. */ +function toFloat(f) { + return parseFloat(f.replace(/px$/, "")); +} +
http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/resources/org/apache/spark/ui/static/webui.css ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 4910744..669ad48 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -145,7 +145,7 @@ pre { border: none; } -span.expand-additional-metrics { +span.expand-additional-metrics, span.expand-dag-viz { cursor: pointer; } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4ef9054..b98a54b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -659,6 +659,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } + /** + * Execute a block of code in a scope such that all new RDDs created in this body will + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. + * + * Note: Return statements are NOT allowed in the given body. + */ + private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body) + // Methods for creating RDDs /** Distribute a local Scala collection to form an RDD. @@ -669,7 +677,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. */ - def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def parallelize[T: ClassTag]( + seq: Seq[T], + numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } @@ -678,14 +688,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * This method is identical to `parallelize`. */ - def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def makeRDD[T: ClassTag]( + seq: Seq[T], + numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) @@ -695,10 +707,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { + def textFile( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString).setName(path) + minPartitions).map(pair => pair._2.toString) } /** @@ -728,8 +742,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): - RDD[(String, String)] = { + def wholeTextFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking @@ -776,8 +791,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note Small files are preferred; very large files may cause bad performance. */ @Experimental - def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): - RDD[(String, PortableDataStream)] = { + def binaryFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking @@ -806,8 +822,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return An RDD of data with values, represented as byte arrays */ @Experimental - def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration) - : RDD[Array[Byte]] = { + def binaryRecords( + path: String, + recordLength: Int, + conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope { assertNotStopped() conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path, @@ -848,8 +866,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions - ): RDD[(K, V)] = { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) @@ -869,8 +886,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions - ): RDD[(K, V)] = { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -901,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], @@ -924,13 +940,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * copy them using a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile[K, V, F](path, defaultMinPartitions) + } /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], @@ -953,7 +970,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves @@ -987,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], - vClass: Class[V]): RDD[(K, V)] = { + vClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf. Required to access secure HDFS. val jconf = new JobConf(conf) @@ -1007,7 +1024,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli keyClass: Class[K], valueClass: Class[V], minPartitions: Int - ): RDD[(K, V)] = { + ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) @@ -1021,7 +1038,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. * */ - def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = { + def sequenceFile[K, V]( + path: String, + keyClass: Class[K], + valueClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } @@ -1051,16 +1071,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], - kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) - : RDD[(K, V)] = { - assertNotStopped() - val kc = kcf() - val vc = vcf() - val format = classOf[SequenceFileInputFormat[Writable, Writable]] - val writables = hadoopFile(path, format, + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { + withScope { + assertNotStopped() + val kc = kcf() + val vc = vcf() + val format = classOf[SequenceFileInputFormat[Writable, Writable]] + val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) - writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } + writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } + } } /** @@ -1073,21 +1094,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions - ): RDD[T] = { + minPartitions: Int = defaultMinPartitions): RDD[T] = withScope { assertNotStopped() sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } - protected[spark] def checkpointFile[T: ClassTag]( - path: String - ): RDD[T] = { + protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope { new CheckpointRDD[T](this, path) } /** Build the union of a list of RDDs. */ - def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope { val partitioners = rdds.flatMap(_.partitioner).toSet if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) @@ -1097,8 +1115,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs passed as variable-length arguments. */ - def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = + def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope { union(Seq(first) ++ rest) + } /** Get an RDD that has no partitions or elements. */ def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this) @@ -2060,10 +2079,10 @@ object SparkContext extends Logging { } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" - private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" - private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" + private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope" + private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride" /** * Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 3406a7e..ec18534 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -33,7 +33,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for counting the number of elements in the RDD. */ - def countAsync(): FutureAction[Long] = { + def countAsync(): FutureAction[Long] = self.withScope { val totalCount = new AtomicLong self.context.submitJob( self, @@ -53,7 +53,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving all elements of this RDD. */ - def collectAsync(): FutureAction[Seq[T]] = { + def collectAsync(): FutureAction[Seq[T]] = self.withScope { val results = new Array[Array[T]](self.partitions.length) self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length), (index, data) => results(index) = data, results.flatten.toSeq) @@ -62,7 +62,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving the first num elements of the RDD. */ - def takeAsync(num: Int): FutureAction[Seq[T]] = { + def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] f.run { @@ -109,7 +109,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to all elements of this RDD. */ - def foreachAsync(f: T => Unit): FutureAction[Unit] = { + def foreachAsync(f: T => Unit): FutureAction[Unit] = self.withScope { val cleanF = self.context.clean(f) self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length), (index, data) => Unit, Unit) @@ -118,7 +118,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to each partition of this RDD. */ - def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { + def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = self.withScope { self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length), (index, data) => Unit, Unit) } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 843a893..926bce6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.StatCounter */ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { /** Add up the elements in this RDD. */ - def sum(): Double = { + def sum(): Double = self.withScope { self.fold(0.0)(_ + _) } @@ -38,37 +38,49 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and * count of the RDD's elements in one operation. */ - def stats(): StatCounter = { + def stats(): StatCounter = self.withScope { self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) } /** Compute the mean of this RDD's elements. */ - def mean(): Double = stats().mean + def mean(): Double = self.withScope { + stats().mean + } /** Compute the variance of this RDD's elements. */ - def variance(): Double = stats().variance + def variance(): Double = self.withScope { + stats().variance + } /** Compute the standard deviation of this RDD's elements. */ - def stdev(): Double = stats().stdev + def stdev(): Double = self.withScope { + stats().stdev + } /** * Compute the sample standard deviation of this RDD's elements (which corrects for bias in * estimating the standard deviation by dividing by N-1 instead of N). */ - def sampleStdev(): Double = stats().sampleStdev + def sampleStdev(): Double = self.withScope { + stats().sampleStdev + } /** * Compute the sample variance of this RDD's elements (which corrects for bias in * estimating the variance by dividing by N-1 instead of N). */ - def sampleVariance(): Double = stats().sampleVariance + def sampleVariance(): Double = self.withScope { + stats().sampleVariance + } /** * :: Experimental :: * Approximate operation to return the mean within a timeout. */ @Experimental - def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + def meanApprox( + timeout: Long, + confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) val evaluator = new MeanEvaluator(self.partitions.length, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) @@ -79,7 +91,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * Approximate operation to return the sum within a timeout. */ @Experimental - def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + def sumApprox( + timeout: Long, + confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) val evaluator = new SumEvaluator(self.partitions.length, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) @@ -93,7 +107,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * If the RDD contains infinity, NaN throws an exception * If the elements in RDD do not vary (max == min) always returns a single bucket. */ - def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = { + def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = self.withScope { // Scala's built-in range has issues. See #SI-8782 def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = { val span = max - min @@ -140,7 +154,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { * the maximum value of the last position and all NaN entries will be counted * in that bucket. */ - def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = { + def histogram( + buckets: Array[Double], + evenBuckets: Boolean = false): Array[Long] = self.withScope { if (buckets.length < 2) { throw new IllegalArgumentException("buckets array must have at least two elements") } http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f77abac..2cefe63 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp */ @DeveloperApi class HadoopRDD[K, V]( - sc: SparkContext, + @transient sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[Configuration]], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], @@ -108,6 +108,10 @@ class HadoopRDD[K, V]( minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { + if (initLocalJobConfFuncOpt.isDefined) { + sc.clean(initLocalJobConfFuncOpt.get) + } + def this( sc: SparkContext, conf: JobConf, http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index 6afe501..d71bb63 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -57,7 +57,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, */ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) - : RDD[(K, V)] = + : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) @@ -71,7 +71,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, * This is more efficient than calling `repartition` and then sorting within each partition * because it can push the sorting down into the shuffle machinery. */ - def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = { + def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) } @@ -81,7 +81,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag, * performed efficiently by only scanning the partitions that might contain matching elements. * Otherwise, a standard `filter` is applied to all partitions. */ - def filterByRange(lower: K, upper: K): RDD[P] = { + def filterByRange(lower: K, upper: K): RDD[P] = self.withScope { def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper) http://git-wip-us.apache.org/repos/asf/spark/blob/863ec0cb/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 05351ba..93d338f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -29,7 +29,7 @@ import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} @@ -75,7 +75,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, - serializer: Serializer = null): RDD[(K, C)] = { + serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { @@ -108,7 +108,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numPartitions: Int): RDD[(K, C)] = { + numPartitions: Int): RDD[(K, C)] = self.withScope { combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } @@ -122,7 +122,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) @@ -144,7 +144,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) } @@ -158,7 +158,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * instead of creating a new U. */ def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) } @@ -167,7 +167,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey( + zeroValue: V, + partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) @@ -185,7 +187,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } @@ -194,7 +196,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) } @@ -213,7 +215,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], - seed: Long = Utils.random.nextLong): RDD[(K, V)] = { + seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") @@ -242,9 +244,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @return RDD containing the sampled subset */ @Experimental - def sampleByKeyExact(withReplacement: Boolean, + def sampleByKeyExact( + withReplacement: Boolean, fractions: Map[K, Double], - seed: Long = Utils.random.nextLong): RDD[(K, V)] = { + seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") @@ -261,7 +264,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) } @@ -270,7 +273,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { + def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) } @@ -280,7 +283,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ - def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { + def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) } @@ -289,7 +292,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * immediately to the master as a Map. This will also perform the merging locally on each mapper * before sending results to a reducer, similarly to a "combiner" in MapReduce. */ - def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { + def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope { if (keyClass.isArray) { throw new SparkException("reduceByKeyLocally() does not support array keys") @@ -317,7 +320,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** Alias for reduceByKeyLocally */ @deprecated("Use reduceByKeyLocally", "1.0.0") - def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope { + reduceByKeyLocally(func) + } /** * Count the number of elements for each key, collecting the results to a local Map. @@ -327,7 +332,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap + def countByKey(): Map[K, Long] = self.withScope { + self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap + } /** * :: Experimental :: @@ -336,7 +343,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) - : PartialResult[Map[K, BoundedDouble]] = { + : PartialResult[Map[K, BoundedDouble]] = self.withScope { self.map(_._1).countByValueApprox(timeout, confidence) } @@ -360,7 +367,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental - def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey( + p: Int, + sp: Int, + partitioner: Partitioner): RDD[(K, Long)] = self.withScope { require(p >= 4, s"p ($p) must be >= 4") require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") @@ -392,7 +402,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey( + relativeSD: Double, + partitioner: Partitioner): RDD[(K, Long)] = self.withScope { require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt assert(p <= 32) @@ -410,7 +422,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + def countApproxDistinctByKey( + relativeSD: Double, + numPartitions: Int): RDD[(K, Long)] = self.withScope { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -424,7 +438,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } @@ -441,7 +455,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { + def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. @@ -465,14 +479,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { + def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { groupByKey(new HashPartitioner(numPartitions)) } /** * Return a copy of the RDD partitioned using the specified partitioner. */ - def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -488,7 +502,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) @@ -500,7 +514,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W]( + other: RDD[(K, W)], + partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) @@ -517,7 +533,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * partition the output RDD. */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Option[V], W))] = { + : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) @@ -536,7 +552,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * in `this` have key k. Uses the given Partitioner to partition the output RDD. */ def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Option[V], Option[W]))] = { + : RDD[(K, (Option[V], Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) @@ -549,7 +565,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * existing partitioner/parallelism level. */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + : RDD[(K, C)] = self.withScope { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -563,7 +579,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - def groupByKey(): RDD[(K, Iterable[V])] = { + def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) } @@ -572,7 +588,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) } @@ -581,7 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope { join(other, new HashPartitioner(numPartitions)) } @@ -591,7 +607,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -601,7 +617,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -611,7 +629,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -621,7 +639,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -634,7 +654,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ * parallelism level. */ - def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope { fullOuterJoin(other, defaultPartitioner(self, other)) } @@ -646,7 +666,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. */ - def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + def fullOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope { fullOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -656,7 +678,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * one value per key is preserved in the map returned) */ - def collectAsMap(): Map[K, V] = { + def collectAsMap(): Map[K, V] = self.withScope { val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) @@ -668,7 +690,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U](f: V => U): RDD[(K, U)] = { + def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, @@ -679,7 +701,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) => iter.flatMap { case (k, v) => @@ -697,7 +719,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -715,7 +737,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W]))] = { + : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -730,7 +752,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -748,7 +770,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * for that key in `this`, `other1`, `other2` and `other3`. */ def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) } @@ -756,7 +778,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other)) } @@ -765,7 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -773,7 +795,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, new HashPartitioner(numPartitions)) } @@ -782,7 +806,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, new HashPartitioner(numPartitions)) } @@ -795,24 +819,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** Alias for cogroup. */ def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) } @@ -822,22 +846,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope { subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length))) + } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = + def subtractByKey[W: ClassTag]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, V)] = self.withScope { subtractByKey(other, new HashPartitioner(numPartitions)) + } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope { new SubtractedRDD[K, V, W](self, other, p) + } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */ - def lookup(key: K): Seq[V] = { + def lookup(key: K): Seq[V] = self.withScope { self.partitioner match { case Some(p) => val index = p.getPartition(key) @@ -859,7 +888,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + def saveAsHadoopFile[F <: OutputFormat[K, V]]( + path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -869,7 +899,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( - path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { + path: String, + codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope { val runtimeClass = fm.runtimeClass saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) } @@ -878,7 +909,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]]( + path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -891,8 +923,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration) - { + conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) @@ -912,7 +943,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - codec: Class[_ <: CompressionCodec]) { + codec: Class[_ <: CompressionCodec]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, new JobConf(self.context.hadoopConfiguration), Some(codec)) } @@ -927,7 +958,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), - codec: Option[Class[_ <: CompressionCodec]] = None) { + codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) @@ -960,7 +991,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - def saveAsNewAPIHadoopDataset(conf: Configuration) { + def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) @@ -1027,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - def saveAsHadoopDataset(conf: JobConf) { + def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val wrappedConf = new SerializableWritable(hadoopConf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org