GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/19938

    [SPARK-22747][SQL] Localize lifetime of mutable states in HashAggregateExec

    ## What changes were proposed in this pull request?
    
    This PR localizes lifetime of mutable states, which are used for `isNull` 
and `value` of aggregation results, in generated code by `HashAggregateExec`. 
    
    These status are passed to successor operations thru `consume()` method. It 
may violate this assumption at #19865 when operations that uses these variables 
are split.  In the following example, `agg_localBufValue` and 
`agg_localBufisNull` are passed to an successor operation (`projection`).
    
    This PR is based on @cloud-fan 's 
[suggestion](https://github.com/apache/spark/pull/19865#issuecomment-348776654).
    
    Without this PR
    ```
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private boolean agg_initAgg;
    /* 009 */   private boolean agg_bufIsNull;
    /* 010 */   private long agg_bufValue;
    /* 011 */   private scala.collection.Iterator inputadapter_input;
    ...
    /* 039 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 040 */     // initialize aggregation buffer
    /* 041 */     final long agg_value = -1L;
    /* 042 */     agg_bufIsNull = true;
    /* 043 */     agg_bufValue = agg_value;
    /* 044 */
    /* 045 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 046 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 047 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 048 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    ...
    /* 100 */       } while (false);
    /* 101 */       final boolean agg_isNull3 = agg_coalesceTmpIsNull;
    /* 102 */       // update aggregation buffer
    /* 103 */       agg_bufIsNull = agg_isNull3;
    /* 104 */       agg_bufValue = agg_value3;
    /* 105 */       if (shouldStop()) return;
    /* 106 */     }
    /* 107 */
    /* 108 */   }
    /* 109 */
    /* 110 */   protected void processNext() throws java.io.IOException {
    /* 111 */     while (!agg_initAgg) {
    /* 112 */       agg_initAgg = true;
    /* 113 */       long agg_beforeAgg = System.nanoTime();
    /* 114 */       agg_doAggregateWithoutKey();
    /* 115 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
1000000);
    /* 116 */
    /* 117 */       // output the result
    /* 118 */
    /* 119 */       agg_numOutputRows.add(1);
    /* 120 */       agg_rowWriter.zeroOutNullBytes();
    /* 121 */
    /* 122 */       if (agg_bufisNull) {
    /* 123 */         agg_rowWriter.setNullAt(0);
    /* 124 */       } else {
    /* 125 */         agg_rowWriter.write(0, agg_bufValue);
    /* 126 */       }
    /* 127 */       append(agg_result);
    /* 128 */     }
    /* 129 */   }
    ```
    
    With this PR
    ```
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private boolean agg_initAgg;
    /* 009 */   private boolean agg_bufIsNull;
    /* 010 */   private long agg_bufValue;
    /* 011 */   private scala.collection.Iterator inputadapter_input;
    ...
    /* 039 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 040 */     // initialize aggregation buffer
    /* 041 */     final long agg_value = -1L;
    /* 042 */     agg_bufIsNull = true;
    /* 043 */     agg_bufValue = agg_value;
    /* 044 */
    /* 045 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 046 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 047 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 048 */       long inputadapter_value = inputadapter_isNull ? -1L : 
(inputadapter_row.getLong(0));
    ...
    /* 100 */       } while (false);
    /* 101 */       final boolean agg_isNull3 = agg_coalesceTmpIsNull;
    /* 102 */       // update aggregation buffer
    /* 103 */       agg_bufIsNull = agg_isNull3;
    /* 104 */       agg_bufValue = agg_value3;
    /* 105 */       if (shouldStop()) return;
    /* 106 */     }
    /* 107 */
    /* 108 */   }
    /* 109 */
    /* 110 */   protected void processNext() throws java.io.IOException {
    /* 111 */     while (!agg_initAgg) {
    /* 112 */       agg_initAgg = true;
    /* 113 */       long agg_beforeAgg = System.nanoTime();
    /* 114 */       agg_doAggregateWithoutKey();
    /* 115 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
1000000);
    /* 116 */
    /* 117 */       // output the result
    /* 118 */
    /* 119 */       boolean agg_localBufisNull = agg_bufIsNull;
    /* 120 */       long agg_localBufValue = agg_bufValue;
    /* 121 */
    /* 122 */       agg_numOutputRows.add(1);
    /* 123 */       agg_rowWriter.zeroOutNullBytes();
    /* 124 */
    /* 125 */       if (agg_localBufisNull) {
    /* 126 */         agg_rowWriter.setNullAt(0);
    /* 127 */       } else {
    /* 128 */         agg_rowWriter.write(0, agg_localBufValue);
    /* 129 */       }
    /* 130 */       append(agg_result);
    /* 131 */     }
    /* 132 */   }
    ```
    
    
    ## How was this patch tested?
    
    Existing test suites

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-22747

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19938
    
----
commit ac65dd21b975481f5c4feb5f0745a2c45d000728
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Date:   2017-12-10T07:06:43Z

    initial commit

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to