Repository: spark
Updated Branches:
  refs/heads/master b779c9351 -> ee56fc343


http://git-wip-us.apache.org/repos/asf/spark/blob/ee56fc34/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index c9a1514..78137d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -279,29 +279,30 @@ case class SampleExec(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
     val numOutput = metricTerm(ctx, "numOutputRows")
-    val sampler = ctx.freshName("sampler")
 
     if (withReplacement) {
       val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
       val initSampler = ctx.freshName("initSampler")
 
-      val initSamplerFuncName = ctx.addNewFunction(initSampler,
-        s"""
-          | private void $initSampler() {
-          |   $sampler = new $samplerClass<UnsafeRow>($upperBound - 
$lowerBound, false);
-          |   java.util.Random random = new java.util.Random(${seed}L);
-          |   long randomSeed = random.nextLong();
-          |   int loopCount = 0;
-          |   while (loopCount < partitionIndex) {
-          |     randomSeed = random.nextLong();
-          |     loopCount += 1;
-          |   }
-          |   $sampler.setSeed(randomSeed);
-          | }
-         """.stripMargin.trim)
-
-      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
-        s"$initSamplerFuncName();")
+      // inline mutable state since not many Sample operations in a task
+      val sampler = ctx.addMutableState(s"$samplerClass<UnsafeRow>", 
"sampleReplace",
+        v => {
+          val initSamplerFuncName = ctx.addNewFunction(initSampler,
+            s"""
+              | private void $initSampler() {
+              |   $v = new $samplerClass<UnsafeRow>($upperBound - $lowerBound, 
false);
+              |   java.util.Random random = new java.util.Random(${seed}L);
+              |   long randomSeed = random.nextLong();
+              |   int loopCount = 0;
+              |   while (loopCount < partitionIndex) {
+              |     randomSeed = random.nextLong();
+              |     loopCount += 1;
+              |   }
+              |   $v.setSeed(randomSeed);
+              | }
+           """.stripMargin.trim)
+          s"$initSamplerFuncName();"
+        }, forceInline = true)
 
       val samplingCount = ctx.freshName("samplingCount")
       s"""
@@ -313,10 +314,10 @@ case class SampleExec(
        """.stripMargin.trim
     } else {
       val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName
-      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
-        s"""
-          | $sampler = new $samplerClass<UnsafeRow>($lowerBound, $upperBound, 
false);
-          | $sampler.setSeed(${seed}L + partitionIndex);
+      val sampler = ctx.addMutableState(s"$samplerClass<UnsafeRow>", "sampler",
+        v => s"""
+          | $v = new $samplerClass<UnsafeRow>($lowerBound, $upperBound, false);
+          | $v.setSeed(${seed}L + partitionIndex);
          """.stripMargin.trim)
 
       s"""
@@ -363,20 +364,18 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   protected override def doProduce(ctx: CodegenContext): String = {
     val numOutput = metricTerm(ctx, "numOutputRows")
 
-    val initTerm = ctx.freshName("initRange")
-    ctx.addMutableState(ctx.JAVA_BOOLEAN, initTerm, s"$initTerm = false;")
-    val number = ctx.freshName("number")
-    ctx.addMutableState(ctx.JAVA_LONG, number, s"$number = 0L;")
+    val initTerm = ctx.addMutableState(ctx.JAVA_BOOLEAN, "initRange")
+    val number = ctx.addMutableState(ctx.JAVA_LONG, "number")
 
     val value = ctx.freshName("value")
     val ev = ExprCode("", "false", value)
     val BigInt = classOf[java.math.BigInteger].getName
 
-    val taskContext = ctx.freshName("taskContext")
-    ctx.addMutableState("TaskContext", taskContext, s"$taskContext = 
TaskContext.get();")
-    val inputMetrics = ctx.freshName("inputMetrics")
-    ctx.addMutableState("InputMetrics", inputMetrics,
-        s"$inputMetrics = $taskContext.taskMetrics().inputMetrics();")
+    // inline mutable state since not many Range operations in a task
+    val taskContext = ctx.addMutableState("TaskContext", "taskContext",
+      v => s"$v = TaskContext.get();", forceInline = true)
+    val inputMetrics = ctx.addMutableState("InputMetrics", "inputMetrics",
+      v => s"$v = $taskContext.taskMetrics().inputMetrics();", forceInline = 
true)
 
     // In order to periodically update the metrics without inflicting 
performance penalty, this
     // operator produces elements in batches. After a batch is complete, the 
metrics are updated
@@ -386,12 +385,10 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
     // the metrics.
 
     // Once number == batchEnd, it's time to progress to the next batch.
-    val batchEnd = ctx.freshName("batchEnd")
-    ctx.addMutableState(ctx.JAVA_LONG, batchEnd, s"$batchEnd = 0;")
+    val batchEnd = ctx.addMutableState(ctx.JAVA_LONG, "batchEnd")
 
     // How many values should still be generated by this range operator.
-    val numElementsTodo = ctx.freshName("numElementsTodo")
-    ctx.addMutableState(ctx.JAVA_LONG, numElementsTodo, s"$numElementsTodo = 
0L;")
+    val numElementsTodo = ctx.addMutableState(ctx.JAVA_LONG, "numElementsTodo")
 
     // How many values should be generated in the next batch.
     val nextBatchTodo = ctx.freshName("nextBatchTodo")
@@ -440,10 +437,6 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
         | }
        """.stripMargin)
 
-    val input = ctx.freshName("input")
-    // Right now, Range is only used when there is one upstream.
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-
     val localIdx = ctx.freshName("localIdx")
     val localEnd = ctx.freshName("localEnd")
     val range = ctx.freshName("range")

http://git-wip-us.apache.org/repos/asf/spark/blob/ee56fc34/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index ff5dd70..4f28eeb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -70,7 +70,6 @@ object GenerateColumnAccessor extends 
CodeGenerator[Seq[DataType], ColumnarItera
     val ctx = newCodeGenContext()
     val numFields = columnTypes.size
     val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { 
case (dt, index) =>
-      val accessorName = ctx.freshName("accessor")
       val accessorCls = dt match {
         case NullType => classOf[NullColumnAccessor].getName
         case BooleanType => classOf[BooleanColumnAccessor].getName
@@ -89,7 +88,7 @@ object GenerateColumnAccessor extends 
CodeGenerator[Seq[DataType], ColumnarItera
         case array: ArrayType => classOf[ArrayColumnAccessor].getName
         case t: MapType => classOf[MapColumnAccessor].getName
       }
-      ctx.addMutableState(accessorCls, accessorName)
+      val accessorName = ctx.addMutableState(accessorCls, "accessor")
 
       val createCode = dt match {
         case t if ctx.isPrimitiveType(dt) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ee56fc34/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index c96ed6e..ee763e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -134,19 +134,18 @@ case class BroadcastHashJoinExec(
     // create a name for HashedRelation
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
     val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
-    val relationTerm = ctx.freshName("relation")
     val clsName = broadcastRelation.value.getClass.getName
 
     // At the end of the task, we update the avg hash probe.
     val avgHashProbe = metricTerm(ctx, "avgHashProbe")
-    val addTaskListener = genTaskListener(avgHashProbe, relationTerm)
 
-    ctx.addMutableState(clsName, relationTerm,
-      s"""
-         | $relationTerm = (($clsName) $broadcast.value()).asReadOnlyCopy();
-         | incPeakExecutionMemory($relationTerm.estimatedSize());
-         | $addTaskListener
-       """.stripMargin)
+    // inline mutable state since not many join operations in a task
+    val relationTerm = ctx.addMutableState(clsName, "relation",
+      v => s"""
+         | $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
+         | incPeakExecutionMemory($v.estimatedSize());
+         | ${genTaskListener(avgHashProbe, v)}
+       """.stripMargin, forceInline = true)
     (broadcastRelation, relationTerm)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee56fc34/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 554b731..0737304 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -422,10 +422,9 @@ case class SortMergeJoinExec(
    */
   private def genScanner(ctx: CodegenContext): (String, String) = {
     // Create class member for next row from both sides.
-    val leftRow = ctx.freshName("leftRow")
-    ctx.addMutableState("InternalRow", leftRow)
-    val rightRow = ctx.freshName("rightRow")
-    ctx.addMutableState("InternalRow", rightRow, s"$rightRow = null;")
+    // inline mutable state since not many join operations in a task
+    val leftRow = ctx.addMutableState("InternalRow", "leftRow", forceInline = 
true)
+    val rightRow = ctx.addMutableState("InternalRow", "rightRow", forceInline 
= true)
 
     // Create variables for join keys from both sides.
     val leftKeyVars = createJoinKey(ctx, leftRow, leftKeys, left.output)
@@ -436,14 +435,13 @@ case class SortMergeJoinExec(
     val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
 
     // A list to hold all matched rows from right side.
-    val matches = ctx.freshName("matches")
     val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
 
     val spillThreshold = getSpillThreshold
     val inMemoryThreshold = getInMemoryThreshold
 
-    ctx.addMutableState(clsName, matches,
-      s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")
+    val matches = ctx.addMutableState(clsName, "matches",
+      v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);")
     // Copy the left keys as class members so they could be used in next 
function call.
     val matchedKeyVars = copyKeys(ctx, leftKeyVars)
 
@@ -578,10 +576,11 @@ case class SortMergeJoinExec(
   override def needCopyResult: Boolean = true
 
   override def doProduce(ctx: CodegenContext): String = {
-    val leftInput = ctx.freshName("leftInput")
-    ctx.addMutableState("scala.collection.Iterator", leftInput, s"$leftInput = 
inputs[0];")
-    val rightInput = ctx.freshName("rightInput")
-    ctx.addMutableState("scala.collection.Iterator", rightInput, s"$rightInput 
= inputs[1];")
+    // inline mutable state since not many join operations in a task
+    val leftInput = ctx.addMutableState("scala.collection.Iterator", 
"leftInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val rightInput = ctx.addMutableState("scala.collection.Iterator", 
"rightInput",
+      v => s"$v = inputs[1];", forceInline = true)
 
     val (leftRow, matches) = genScanner(ctx)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee56fc34/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index a8556f6..cccee63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -71,8 +71,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport 
{
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-    val stopEarly = ctx.freshName("stopEarly")
-    ctx.addMutableState(ctx.JAVA_BOOLEAN, stopEarly, s"$stopEarly = false;")
+    val stopEarly = ctx.addMutableState(ctx.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
 
     ctx.addNewFunction("stopEarly", s"""
       @Override
@@ -80,8 +79,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport 
{
         return $stopEarly;
       }
     """, inlineToOuterClass = true)
-    val countTerm = ctx.freshName("count")
-    ctx.addMutableState(ctx.JAVA_INT, countTerm, s"$countTerm = 0;")
+    val countTerm = ctx.addMutableState(ctx.JAVA_INT, "count") // init as 
count = 0
     s"""
        | if ($countTerm < $limit) {
        |   $countTerm += 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to