GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/11636

    [SPARK-13805][SQL] Generate code that get a value in each column from 
ColumnVector when ColumnarVector is used

    ## What changes were proposed in this pull request?
    
    This PR generates code that get a value in each column from 
```ColumnVector``` instead of creating ```InternalRow``` when 
```ColumnarBatch``` is accessed. This PR improves benchmark program by up to 
15%.
    This PR consists of two parts:
    
    1. Get an ```ColumnVector ``` by using ```ColumnarBatch.column()``` method
    2. Get a value of each column by using 
```rdd_col${COLIDX}.getInt(ROWIDX)``` instead of ```rdd_row.getInt(COLIDX)```
    
    
    This is a motivated example.
    ````
        
sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
"true")
        sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, 
"true")
        val values = 10
        withTempPath { dir =>
          withTempTable("t1", "tempTable") {
            sqlContext.range(values).registerTempTable("t1")
            sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1")
              .write.partitionBy("p").parquet(dir.getCanonicalPath)
            
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
            sqlContext.sql("select sum(p) from tempTable").collect
          }
        }
    ```` 
    
    The original code
    ````
        ...
        /* 072 */       while (!shouldStop() && rdd_batchIdx < numRows) {
        /* 073 */         InternalRow rdd_row = 
rdd_batch.getRow(rdd_batchIdx++);
        /* 074 */         /*** CONSUME: TungstenAggregate(key=[], 
functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], 
output=[sum#10L]) */
        /* 075 */         /* input[0, int] */
        /* 076 */         boolean rdd_isNull = rdd_row.isNullAt(0);
        /* 077 */         int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0));
        ...
    ````
    
    The code generated by this PR
    ````
        /* 072 */       while (!shouldStop() && rdd_batchIdx < numRows) {
        /* 073 */         
org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = 
rdd_batch.column(0);
        /* 074 */         /*** CONSUME: TungstenAggregate(key=[], 
functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], 
output=[sum#10L]) */
        /* 075 */         /* input[0, int] */
        /* 076 */         boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx);
        /* 077 */         int rdd_value = rdd_isNull ? -1 : 
(rdd_col0.getInt(rdd_batchIdx));
        ...
        /* 128 */         rdd_batchIdx++;
        /* 129 */       }
        /* 130 */       if (shouldStop()) return;
    
    ````
    Performance
    Without this PR
    ````
    model name  : Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz
    Partitioned Table:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
    
-------------------------------------------------------------------------------------------
    Read data column                          434 /  488         36.3          
27.6       1.0X
    Read partition column                     302 /  346         52.1          
19.2       1.4X
    Read both columns                         588 /  643         26.8          
37.4       0.7X
    ````
    With this PR
    ````
    model name  : Intel(R) Xeon(R) CPU E5-2667 v2 @ 3.30GHz
    Partitioned Table:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
    
-------------------------------------------------------------------------------------------
    Read data column                          392 /  516         40.1          
24.9       1.0X
    Read partition column                     256 /  318         61.4          
16.3       1.5X
    Read both columns                         523 /  539         30.1          
33.3       0.7X
    ````
    
    ## How was this patch tested?
    Tested by existing test suites and benchmark


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-13805

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11636.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11636
    
----
commit 0db679f9bdc410cd7483faa366557cc5e5925889
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Date:   2016-03-10T19:15:05Z

    Generate code that get a value in each column from ColumnVector when 
ColumnarVector is used

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to