[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19869


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154682180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -596,7 +596,7 @@ case class HashAggregateExec(
 ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
   s"$fastHashMapTerm = new $fastHashMapClassName();")
 ctx.addMutableState(
-  classOf[java.util.Iterator[ColumnarRow]].getName,
+  s"java.util.Iterator<${classOf[ColumnarRow]}>",
--- End diff --

damn...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154580311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -596,7 +596,7 @@ case class HashAggregateExec(
 ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
   s"$fastHashMapTerm = new $fastHashMapClassName();")
 ctx.addMutableState(
-  classOf[java.util.Iterator[ColumnarRow]].getName,
+  s"java.util.Iterator<${classOf[ColumnarRow]}>",
--- End diff --

```scala
scala> s"java.util.Iterator<${classOf[ColumnarRow]}>"
res2: String = java.util.Iterator
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154576368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -573,94 +574,84 @@ case class HashAggregateExec(
   enableTwoLevelHashMap(ctx)
 } else {
   
sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) 
match {
-case "true" => logWarning("Two level hashmap is disabled but 
vectorized hashmap is " +
-  "enabled.")
-case null | "" | "false" => None
+case "true" =>
+  logWarning("Two level hashmap is disabled but vectorized hashmap 
is enabled.")
+case _ =>
   }
 }
-fastHashMapTerm = ctx.freshName("fastHashMap")
-val fastHashMapClassName = ctx.freshName("FastHashMap")
-val fastHashMapGenerator =
-  if (isVectorizedHashMapEnabled) {
-new VectorizedHashMapGenerator(ctx, aggregateExpressions,
-  fastHashMapClassName, groupingKeySchema, bufferSchema)
-  } else {
-new RowBasedHashMapGenerator(ctx, aggregateExpressions,
-  fastHashMapClassName, groupingKeySchema, bufferSchema)
-  }
 
 val thisPlan = ctx.addReferenceObj("plan", this)
 
-// Create a name for iterator from vectorized HashMap
+// Create a name for the iterator from the fast hash map.
 val iterTermForFastHashMap = ctx.freshName("fastHashMapIter")
 if (isFastHashMapEnabled) {
+  // Generates the fast hash map class and creates the fash hash map 
term.
+  fastHashMapTerm = ctx.freshName("fastHashMap")
+  val fastHashMapClassName = ctx.freshName("FastHashMap")
   if (isVectorizedHashMapEnabled) {
+val generatedMap = new VectorizedHashMapGenerator(ctx, 
aggregateExpressions,
+  fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
+ctx.addInnerClass(generatedMap)
+
 ctx.addMutableState(fastHashMapClassName, fastHashMapTerm,
   s"$fastHashMapTerm = new $fastHashMapClassName();")
 ctx.addMutableState(
-  
"java.util.Iterator",
+  classOf[java.util.Iterator[ColumnarRow]].getName,
--- End diff --

Is this as same as before?

```scala
scala> classOf[java.util.Iterator[Int]].getName
res2: String = java.util.Iterator
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154569858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

ok, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154569741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

I'll have another PR for this part, will reformat it at that time


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154566445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -882,45 +851,65 @@ case class HashAggregateExec(
  |${evaluateVariables(unsafeRowBufferEvals)}
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
-   """.stripMargin
+   """.stripMargin
 }
 
+val updateRowInHashMap: String = {
+  if (isFastHashMapEnabled) {
+ctx.INPUT_ROW = fastRowBuffer
+val boundUpdateExpr = 
updateExpr.map(BindReferences.bindReference(_, inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
+  val dt = updateExpr(i).dataType
+  ctx.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+}
+
+// If fast hash map is on, we first generate code to update row in 
fast hash map, if the
+// previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
+s"""
+   |if ($fastRowBuffer != null) {
+   |  // common sub-expressions
+   |  $effectiveCodes
+   |  // evaluate aggregate function
+   |  ${evaluateVariables(fastRowEvals)}
+   |  // update fast row
+   |  ${updateFastRow.mkString("\n").trim}
+   |} else {
+   |  $updateRowInRegularHashMap
+   |}
+   """.stripMargin
+  } else {
+updateRowInRegularHashMap
+  }
+}
+
+val declareFastRowBuffer: String = if (isFastHashMapEnabled) {
+  val rowType = if (isVectorizedHashMapEnabled) {
+classOf[MutableColumnarRow].getName
+  } else {
+"UnsafeRow"
+  }
+  s"$rowType $fastRowBuffer = null;"
+} else ""
 
 // We try to do hash map based in-memory aggregation first. If there 
is not enough memory (the
 // hash map will return null for new key), we spill the hash map to 
disk to free memory, then
 // continue to do in-memory aggregation and spilling until all the 
rows had been processed.
 // Finally, sort the spilled aggregate buffers by key, and merge them 
together for same key.
 s"""
  UnsafeRow $unsafeRowBuffer = null;
--- End diff --

nit: How about this for just consistency?
```
s"""
 $declareRowBuffer

 $findOrInsertHashMap

 $incCounter

 $updateRowInHashMap
 """
```
Then,
```
val declareRowBuffer: String = if (isFastHashMapEnabled) {
  val rowType = if (isVectorizedHashMapEnabled) {
classOf[MutableColumnarRow].getName
  } else {
"UnsafeRow"
  }
  s"""
 |UnsafeRow $unsafeRowBuffer = null;
 |$rowType $fastRowBuffer = null;
   """.stripMargin
} else {
  s"UnsafeRow $unsafeRowBuffer = null;"
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154565659
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -768,12 +762,8 @@ case class HashAggregateExec(
 
 // generate hash code for key
 val hashExpr = Murmur3Hash(groupingExpressions, 42)
-ctx.currentVars = input
 val hashEval = BindReferences.bindReference(hashExpr, 
child.output).genCode(ctx)
 
-val inputAttr = aggregateBufferAttributes ++ child.output
-ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
-
 val (checkFallbackForGeneratedHashMap, 
checkFallbackForBytesToBytesMap, resetCounter,
 incCounter) = if (testFallbackStartsAt.isDefined) {
--- End diff --

nit: need some indents in the head?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154561232
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -621,34 +622,30 @@ case class HashAggregateExec(
 val iterTerm = ctx.freshName("mapIter")
 ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, 
iterTerm)
 
-def generateGenerateCode(): String = {
-  if (isFastHashMapEnabled) {
-if (isVectorizedHashMapEnabled) {
-  s"""
-   | 
${fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()}
-  """.stripMargin
-} else {
-  s"""
-   | 
${fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()}
-  """.stripMargin
-}
-  } else ""
+if (isFastHashMapEnabled) {
+  val generatedMap = if (isVectorizedHashMapEnabled) {
+
fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()
+  } else {
+
fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()
+  }
+  ctx.addInnerClass(generatedMap)
 }
-ctx.addInnerClass(generateGenerateCode())
 
 val doAgg = ctx.freshName("doAggregateWithKeys")
 val peakMemory = metricTerm(ctx, "peakMemory")
 val spillSize = metricTerm(ctx, "spillSize")
 val avgHashProbe = metricTerm(ctx, "avgHashProbe")
+val finishFashHashMap = if (isFastHashMapEnabled) {
--- End diff --

nit: can we merge this branch with the the branch above (line 625) for 
hashmap stuffs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154562413
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
--- End diff --

super nit: can we also drop unnecessary spaces in the head from this file? 
e.g., 
```
  s"""
 | private void $doAgg() throws java.io.IOException {
```
```
  s"""
 |private void $doAgg() throws java.io.IOException {
```

https://github.com/cloud-fan/spark/blob/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L233


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154556638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
+ |  UnsafeRow $keyTerm = (UnsafeRow) 
$iterTermForFastHashMap.getKey();
+ |  UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
+ |  $outputFunc($keyTerm, $bufferTerm);
+ |
+ |  if (shouldStop()) return;
+ |}
+ |$fastHashMapTerm.close();
+   """.stripMargin
 }
 
 // Iterate over the aggregate rows and convert them from ColumnarRow 
to UnsafeRow
 def outputFromVectorizedMap: String = {
 val row = ctx.freshName("fastHashMapRow")
--- End diff --

ah good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154534333
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
+ |  UnsafeRow $keyTerm = (UnsafeRow) 
$iterTermForFastHashMap.getKey();
+ |  UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
+ |  $outputFunc($keyTerm, $bufferTerm);
+ |
+ |  if (shouldStop()) return;
+ |}
+ |$fastHashMapTerm.close();
+   """.stripMargin
 }
 
 // Iterate over the aggregate rows and convert them from ColumnarRow 
to UnsafeRow
 def outputFromVectorizedMap: String = {
 val row = ctx.freshName("fastHashMapRow")
--- End diff --

nit: Is it better to fix indentations for these three lines, too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528578
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -882,45 +851,65 @@ case class HashAggregateExec(
  |${evaluateVariables(unsafeRowBufferEvals)}
  |// update unsafe row buffer
  |${updateUnsafeRowBuffer.mkString("\n").trim}
-   """.stripMargin
+   """.stripMargin
 }
 
+val updateRowInHashMap: String = {
+  if (isFastHashMapEnabled) {
+ctx.INPUT_ROW = fastRowBuffer
+val boundUpdateExpr = 
updateExpr.map(BindReferences.bindReference(_, inputAttr))
+val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+val effectiveCodes = subExprs.codes.mkString("\n")
+val fastRowEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
+  boundUpdateExpr.map(_.genCode(ctx))
+}
+val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
+  val dt = updateExpr(i).dataType
+  ctx.updateColumn(
+fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
+}
+
+// If fast hash map is on, we first generate code to update row in 
fast hash map, if the
+// previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
+s"""
+   |if ($fastRowBuffer != null) {
+   |  // common sub-expressions
+   |  $effectiveCodes
+   |  // evaluate aggregate function
+   |  ${evaluateVariables(fastRowEvals)}
+   |  // update fast row
+   |  ${updateFastRow.mkString("\n").trim}
+   |} else {
+   |  $updateRowInRegularHashMap
+   |}
+   """.stripMargin
+  } else {
+updateRowInRegularHashMap
--- End diff --

Previously we always declare the `fastRowBuffer` and have the `if 
(fastRowBuffer != null)` check. Now we don't generate then if fast hash map is 
not enabled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528498
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -784,86 +774,65 @@ case class HashAggregateExec(
   ("true", "true", "", "")
 }
 
-// We first generate code to probe and update the fast hash map. If 
the probe is
-// successful the corresponding fast row buffer will hold the mutable 
row
-val findOrInsertFastHashMap: Option[String] = {
+val findOrInsertRegularHashMap: String =
+  s"""
+ |// generate grouping key
+ |${unsafeRowKeyCode.code.trim}
+ |${hashEval.code.trim}
+ |if ($checkFallbackForBytesToBytesMap) {
+ |  // try to get the buffer from hash map
+ |  $unsafeRowBuffer =
+ |
$hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, 
${hashEval.value});
+ |}
+ |// Can't allocate buffer from the hash map. Spill the map and 
fallback to sort-based
+ |// aggregation after processing all input rows.
+ |if ($unsafeRowBuffer == null) {
+ |  if ($sorterTerm == null) {
+ |$sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
+ |  } else {
+ |
$sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+ |  }
+ |  $resetCounter
+ |  // the hash map had be spilled, it should have enough memory 
now,
+ |  // try to allocate buffer again.
+ |  $unsafeRowBuffer = 
$hashMapTerm.getAggregationBufferFromUnsafeRow(
+ |$unsafeRowKeys, ${hashEval.value});
+ |  if ($unsafeRowBuffer == null) {
+ |// failed to allocate the first page
+ |throw new OutOfMemoryError("No enough memory for 
aggregation");
+ |  }
+ |}
+   """.stripMargin
+
+val findOrInsertHashMap: String = {
   if (isFastHashMapEnabled) {
-Option(
-  s"""
- |
- |if ($checkFallbackForGeneratedHashMap) {
- |  ${fastRowKeys.map(_.code).mkString("\n")}
- |  if (${fastRowKeys.map("!" + _.isNull).mkString(" && ")}) {
- |$fastRowBuffer = $fastHashMapTerm.findOrInsert(
- |${fastRowKeys.map(_.value).mkString(", ")});
- |  }
- |}
- """.stripMargin)
+// If fast hash map is on, we first generate code to probe and 
update the fast hash map.
+// If the probe is successful the corresponding fast row buffer 
will hold the mutable row.
+s"""
+   |if ($checkFallbackForGeneratedHashMap) {
--- End diff --

moved from 
https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L794


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528455
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -784,86 +774,65 @@ case class HashAggregateExec(
   ("true", "true", "", "")
 }
 
-// We first generate code to probe and update the fast hash map. If 
the probe is
-// successful the corresponding fast row buffer will hold the mutable 
row
-val findOrInsertFastHashMap: Option[String] = {
+val findOrInsertRegularHashMap: String =
--- End diff --

moved from 
https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L833


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
+ |  UnsafeRow $keyTerm = (UnsafeRow) 
$iterTermForFastHashMap.getKey();
+ |  UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
+ |  $outputFunc($keyTerm, $bufferTerm);
+ |
+ |  if (shouldStop()) return;
+ |}
+ |$fastHashMapTerm.close();
+   """.stripMargin
 }
 
 // Iterate over the aggregate rows and convert them from ColumnarRow 
to UnsafeRow
 def outputFromVectorizedMap: String = {
 val row = ctx.freshName("fastHashMapRow")
 ctx.currentVars = null
 ctx.INPUT_ROW = row
-val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
-  groupingKeySchema.toAttributes.zipWithIndex
+  val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
+groupingKeySchema.toAttributes.zipWithIndex
   .map { case (attr, i) => BoundReference(i, attr.dataType, 
attr.nullable) }
-)
-val generateBufferRow = GenerateUnsafeProjection.createCode(ctx,
-  bufferSchema.toAttributes.zipWithIndex
-  .map { case (attr, i) =>
-BoundReference(groupingKeySchema.length + i, attr.dataType, 
attr.nullable) })
-s"""
-   | while ($iterTermForFastHashMap.hasNext()) {
-   |   $numOutput.add(1);
-   |   org.apache.spark.sql.execution.vectorized.ColumnarRow $row =
-   | (org.apache.spark.sql.execution.vectorized.ColumnarRow)
-   | $iterTermForFastHashMap.next();
-   |   ${generateKeyRow.code}
-   |   ${generateBufferRow.code}
-   |   $outputFunc(${generateKeyRow.value}, 
${generateBufferRow.value});
-   |
-   |   if (shouldStop()) return;
-   | }
-   |
-   | $fastHashMapTerm.close();
- """.stripMargin
+  )
+  val generateBufferRow = GenerateUnsafeProjection.createCode(ctx,
+bufferSchema.toAttributes.zipWithIndex.map { case (attr, i) =>
+  BoundReference(groupingKeySchema.length + i, attr.dataType, 
attr.nullable)
+})
+  val columnarRowCls = classOf[ColumnarRow].getName
+  s"""
+ |while ($iterTermForFastHashMap.hasNext()) {
+ |  $columnarRowCls $row = ($columnarRowCls) 
$iterTermForFastHashMap.next();
+ |  ${generateKeyRow.code}
+ |  ${generateBufferRow.code}
+ |  $outputFunc(${generateKeyRow.value}, 
${generateBufferRow.value});
+ |
+ |  if (shouldStop()) return;
+ |}
+ |
+ |$fastHashMapTerm.close();
+   """.stripMargin
 }
 
+def outputFromRegularHashMap: String = {
+  s"""
+ |while ($iterTerm.next()) {
--- End diff --

moved from 
https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L731


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528409
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -672,48 +668,56 @@ case class HashAggregateExec(
 
 def outputFromRowBasedMap: String = {
   s"""
-   while ($iterTermForFastHashMap.next()) {
- $numOutput.add(1);
- UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey();
- UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
- $outputFunc($keyTerm, $bufferTerm);
-
- if (shouldStop()) return;
-   }
-   $fastHashMapTerm.close();
- """
+ |while ($iterTermForFastHashMap.next()) {
+ |  UnsafeRow $keyTerm = (UnsafeRow) 
$iterTermForFastHashMap.getKey();
+ |  UnsafeRow $bufferTerm = (UnsafeRow) 
$iterTermForFastHashMap.getValue();
+ |  $outputFunc($keyTerm, $bufferTerm);
+ |
+ |  if (shouldStop()) return;
+ |}
+ |$fastHashMapTerm.close();
+   """.stripMargin
 }
 
 // Iterate over the aggregate rows and convert them from ColumnarRow 
to UnsafeRow
 def outputFromVectorizedMap: String = {
 val row = ctx.freshName("fastHashMapRow")
 ctx.currentVars = null
 ctx.INPUT_ROW = row
-val generateKeyRow = GenerateUnsafeProjection.createCode(ctx,
-  groupingKeySchema.toAttributes.zipWithIndex
--- End diff --

The indentation was wrong previously.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19869#discussion_r154528386
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -444,6 +444,7 @@ case class HashAggregateExec(
 val funcName = ctx.freshName("doAggregateWithKeysOutput")
 val keyTerm = ctx.freshName("keyTerm")
 val bufferTerm = ctx.freshName("bufferTerm")
+val numOutput = metricTerm(ctx, "numOutputRows")
--- End diff --

update the `numOutputRows`  in the result function instead of doing it for 
both fast hash map and regular hash map.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...

2017-12-03 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19869

[SPARK-22677][SQL] cleanup whole stage codegen for hash aggregate

## What changes were proposed in this pull request?

The `HashAggregateExec` whole stage codegen path is a little messy and hard 
to understand, this code cleans it up a little bit, especially for the fast 
hash map part.

## How was this patch tested?

existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark hash-agg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19869.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19869


commit 174f4ec2ea000de01da6f494db366dc3ff58ccc4
Author: Wenchen Fan 
Date:   2017-11-24T12:43:37Z

cleanup whole stage codegen for hash aggregate




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org