GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/19938
[SPARK-22747][SQL] Localize lifetime of mutable states in HashAggregateExec
## What changes were proposed in this pull request?
This PR localizes lifetime of mutable states, which are used for `isNull`
and `value` of aggregation results, in generated code by `HashAggregateExec`.
These status are passed to successor operations thru `consume()` method. It
may violate this assumption at #19865 when operations that uses these variables
are split. In the following example, `agg_localBufValue` and
`agg_localBufisNull` are passed to an successor operation (`projection`).
This PR is based on @cloud-fan 's
[suggestion](https://github.com/apache/spark/pull/19865#issuecomment-348776654).
Without this PR
```
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator inputadapter_input;
...
/* 039 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 040 */ // initialize aggregation buffer
/* 041 */ final long agg_value = -1L;
/* 042 */ agg_bufIsNull = true;
/* 043 */ agg_bufValue = agg_value;
/* 044 */
/* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 047 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */ long inputadapter_value = inputadapter_isNull ? -1L :
(inputadapter_row.getLong(0));
...
/* 100 */ } while (false);
/* 101 */ final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */ // update aggregation buffer
/* 103 */ agg_bufIsNull = agg_isNull3;
/* 104 */ agg_bufValue = agg_value3;
/* 105 */ if (shouldStop()) return;
/* 106 */ }
/* 107 */
/* 108 */ }
/* 109 */
/* 110 */ protected void processNext() throws java.io.IOException {
/* 111 */ while (!agg_initAgg) {
/* 112 */ agg_initAgg = true;
/* 113 */ long agg_beforeAgg = System.nanoTime();
/* 114 */ agg_doAggregateWithoutKey();
/* 115 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) /
100);
/* 116 */
/* 117 */ // output the result
/* 118 */
/* 119 */ agg_numOutputRows.add(1);
/* 120 */ agg_rowWriter.zeroOutNullBytes();
/* 121 */
/* 122 */ if (agg_bufisNull) {
/* 123 */ agg_rowWriter.setNullAt(0);
/* 124 */ } else {
/* 125 */ agg_rowWriter.write(0, agg_bufValue);
/* 126 */ }
/* 127 */ append(agg_result);
/* 128 */ }
/* 129 */ }
```
With this PR
```
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator inputadapter_input;
...
/* 039 */ private void agg_doAggregateWithoutKey() throws
java.io.IOException {
/* 040 */ // initialize aggregation buffer
/* 041 */ final long agg_value = -1L;
/* 042 */ agg_bufIsNull = true;
/* 043 */ agg_bufValue = agg_value;
/* 044 */
/* 045 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 046 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 047 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 048 */ long inputadapter_value = inputadapter_isNull ? -1L :
(inputadapter_row.getLong(0));
...
/* 100 */ } while (false);
/* 101 */ final boolean agg_isNull3 = agg_coalesceTmpIsNull;
/* 102 */ // update aggregation buffer
/* 103 */ agg_bufIsNull = agg_isNull3;
/* 104 */ agg_bufValue = agg_value3;
/* 105 */ if (shouldStop()) return;
/* 106 */ }
/* 107 */
/* 108 */ }
/* 109 */
/* 110 */ protected void processNext() throws java.io.IOException {
/* 111 */ while (!agg_initAgg) {
/* 112 */ agg_initAgg = true;
/* 113 */ long agg_beforeAgg = System.nanoTime();
/* 114 */ agg_doAggregateWithoutKey();
/* 115 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) /
100);
/* 116 */
/* 117 */ // output the result
/* 118 */
/* 119 */