Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6efe8b583 -> bcc871091


[SPARK-7970] Skip closure cleaning for SQL operations

Also introduces new spark private API in RDD.scala with name 
'mapPartitionsInternal' which doesn't closure cleans the RDD elements.

Author: nitin goyal <nitin.go...@guavus.com>
Author: nitin.goyal <nitin.go...@guavus.com>

Closes #9253 from nitin2goyal/master.

(cherry picked from commit c939c70ac1ab6a26d9fda0a99c4e837f7e5a7935)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcc87109
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcc87109
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcc87109

Branch: refs/heads/branch-1.6
Commit: bcc871091b6acc4ee58f1c1eb5018740fd797d7f
Parents: 6efe8b5
Author: nitin goyal <nitin.go...@guavus.com>
Authored: Fri Nov 13 18:09:08 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Nov 13 18:09:15 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++++++++++++++++
 .../sql/columnar/InMemoryColumnarTableScan.scala  |  4 ++--
 .../org/apache/spark/sql/execution/Exchange.scala |  6 +++---
 .../org/apache/spark/sql/execution/Generate.scala |  4 ++--
 .../execution/aggregate/SortBasedAggregate.scala  |  2 +-
 .../spark/sql/execution/basicOperators.scala      | 16 ++++++++--------
 .../joins/BroadcastLeftSemiJoinHash.scala         |  4 ++--
 .../sql/execution/joins/CartesianProduct.scala    |  2 +-
 .../org/apache/spark/sql/execution/sort.scala     |  2 +-
 9 files changed, 38 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 800ef53..2aeb5ee 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -706,6 +706,24 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * [performance] Spark's internal mapPartitions method which skips closure 
cleaning. It is a
+   * performance API to be used carefully only if we are sure that the RDD 
elements are
+   * serializable and don't require closure cleaning.
+   *
+   * @param preservesPartitioning indicates whether the input function 
preserves the partitioner,
+   * which should be `false` unless this is a pair RDD and the input function 
doesn't modify
+   * the keys.
+   */
+  private[spark] def mapPartitionsInternal[U: ClassTag](
+      f: Iterator[T] => Iterator[U],
+      preservesPartitioning: Boolean = false): RDD[U] = withScope {
+    new MapPartitionsRDD(
+      this,
+      (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
+      preservesPartitioning)
+  }
+
+  /**
    * Return a new RDD by applying a function to each partition of this RDD, 
while tracking the index
    * of the original partition.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 7eb1ad7..2cface6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -125,7 +125,7 @@ private[sql] case class InMemoryRelation(
 
   private def buildBuffers(): Unit = {
     val output = child.output
-    val cached = child.execute().mapPartitions { rowIterator =>
+    val cached = child.execute().mapPartitionsInternal { rowIterator =>
       new Iterator[CachedBatch] {
         def next(): CachedBatch = {
           val columnBuilders = output.map { attribute =>
@@ -292,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
     val relOutput = relation.output
     val buffers = relation.cachedColumnBuffers
 
-    buffers.mapPartitions { cachedBatchIterator =>
+    buffers.mapPartitionsInternal { cachedBatchIterator =>
       val partitionFilter = newPredicate(
         partitionFilters.reduceOption(And).getOrElse(Literal(true)),
         schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index bc252d9..a161cf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -168,7 +168,7 @@ case class Exchange(
       case RangePartitioning(sortingExpressions, numPartitions) =>
         // Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
         // partition bounds. To get accurate samples, we need to copy the 
mutable keys.
-        val rddForSampling = rdd.mapPartitions { iter =>
+        val rddForSampling = rdd.mapPartitionsInternal { iter =>
           val mutablePair = new MutablePair[InternalRow, Null]()
           iter.map(row => mutablePair.update(row.copy(), null))
         }
@@ -200,12 +200,12 @@ case class Exchange(
     }
     val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
       if (needToCopyObjectsBeforeShuffle(part, serializer)) {
-        rdd.mapPartitions { iter =>
+        rdd.mapPartitionsInternal { iter =>
           val getPartitionKey = getPartitionKeyExtractor()
           iter.map { row => (part.getPartition(getPartitionKey(row)), 
row.copy()) }
         }
       } else {
-        rdd.mapPartitions { iter =>
+        rdd.mapPartitionsInternal { iter =>
           val getPartitionKey = getPartitionKeyExtractor()
           val mutablePair = new MutablePair[Int, InternalRow]()
           iter.map { row => 
mutablePair.update(part.getPartition(getPartitionKey(row)), row) }

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 78e33d9..54b8cb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -59,7 +59,7 @@ case class Generate(
   protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in 
the partition
     if (join) {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsInternal { iter =>
         val generatorNullRow = 
InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
         val joinedRow = new JoinedRow
 
@@ -79,7 +79,7 @@ case class Generate(
         }
       }
     } else {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsInternal { iter =>
         iter.flatMap(row => boundGenerator.eval(row)) ++
         LazyIterator(() => boundGenerator.terminate())
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index c8ccbb9..ee98245 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -69,7 +69,7 @@ case class SortBasedAggregate(
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
     val numInputRows = longMetric("numInputRows")
     val numOutputRows = longMetric("numOutputRows")
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       // Because the constructor of an aggregation iterator will read at least 
the first row,
       // we need to get the value of iter.hasNext first.
       val hasInput = iter.hasNext

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index ed82c9a..07925c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -43,7 +43,7 @@ case class TungstenProject(projectList: Seq[NamedExpression], 
child: SparkPlan)
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numRows = longMetric("numRows")
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val project = UnsafeProjection.create(projectList, child.output,
         subexpressionEliminationEnabled)
       iter.map { row =>
@@ -67,7 +67,7 @@ case class Filter(condition: Expression, child: SparkPlan) 
extends UnaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputRows = longMetric("numOutputRows")
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val predicate = newPredicate(condition, child.output)
       iter.filter { row =>
         numInputRows += 1
@@ -161,11 +161,11 @@ case class Limit(limit: Int, child: SparkPlan)
 
   protected override def doExecute(): RDD[InternalRow] = {
     val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if 
(sortBasedShuffleOn) {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsInternal { iter =>
         iter.take(limit).map(row => (false, row.copy()))
       }
     } else {
-      child.execute().mapPartitions { iter =>
+      child.execute().mapPartitionsInternal { iter =>
         val mutablePair = new MutablePair[Boolean, InternalRow]()
         iter.take(limit).map(row => mutablePair.update(false, row))
       }
@@ -173,7 +173,7 @@ case class Limit(limit: Int, child: SparkPlan)
     val part = new HashPartitioner(1)
     val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, 
part)
     shuffled.setSerializer(new 
SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
-    shuffled.mapPartitions(_.take(limit).map(_._2))
+    shuffled.mapPartitionsInternal(_.take(limit).map(_._2))
   }
 }
 
@@ -294,7 +294,7 @@ case class MapPartitions[T, U](
     child: SparkPlan) extends UnaryNode {
 
   override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
       func(iter.map(tBoundEncoder.fromRow)).map(uEncoder.toRow)
     }
@@ -318,7 +318,7 @@ case class AppendColumns[T, U](
   override def output: Seq[Attribute] = child.output ++ newColumns
 
   override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
       val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, 
uEncoder.schema)
       iter.map { row =>
@@ -350,7 +350,7 @@ case class MapGroups[K, T, U](
     Seq(groupingAttributes.map(SortOrder(_, Ascending)))
 
   override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
+    child.execute().mapPartitionsInternal { iter =>
       val grouped = GroupedIterator(iter, groupingAttributes, child.output)
       val groupKeyEncoder = kEncoder.bind(groupingAttributes)
       val groupDataEncoder = tEncoder.bind(child.output)

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index c5cd6a2..004407b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -54,7 +54,7 @@ case class BroadcastLeftSemiJoinHash(
       val hashSet = buildKeyHashSet(input.toIterator, 
SQLMetrics.nullLongMetric)
       val broadcastedRelation = sparkContext.broadcast(hashSet)
 
-      left.execute().mapPartitions { streamIter =>
+      left.execute().mapPartitionsInternal { streamIter =>
         hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, 
numOutputRows)
       }
     } else {
@@ -62,7 +62,7 @@ case class BroadcastLeftSemiJoinHash(
         HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, 
rightKeyGenerator, input.size)
       val broadcastedRelation = sparkContext.broadcast(hashRelation)
 
-      left.execute().mapPartitions { streamIter =>
+      left.execute().mapPartitionsInternal { streamIter =>
         val hashedRelation = broadcastedRelation.value
         hashedRelation match {
           case unsafe: UnsafeHashedRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 0243e19..f467519 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -46,7 +46,7 @@ case class CartesianProduct(left: SparkPlan, right: 
SparkPlan) extends BinaryNod
       row.copy()
     }
 
-    leftResults.cartesian(rightResults).mapPartitions { iter =>
+    leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
       val joinedRow = new JoinedRow
       iter.map { r =>
         numOutputRows += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/bcc87109/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 47fe70a..52ef00e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -47,7 +47,7 @@ case class Sort(
     if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"sort") {
-    child.execute().mapPartitions( { iterator =>
+    child.execute().mapPartitionsInternal( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
         TaskContext.get(), ordering = Some(ordering))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to