GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/17378

    [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler 
regarding sqlContext.read.parquet()

    ## What changes were proposed in this pull request?
    
    This PR improves performance of operations with `sqlContext.read.parquet()` 
by changing Java code generated by Catalyst. This PR is inspired by [the blog 
article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html)
 and [this stackoverflow 
entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).
    
    This PR changes generated code in the following two points.
    1. Replace a while-loop with long instance variables a for-loop with int 
local variables
    2. Suppress generation of `shouldStop()` method if this method is 
unnecessary (e.g. `append()` is not generated).
    
    These points facilitates compiler optimizations in a JIT compiler by 
feeding the simplified Java code into the JIT compiler. The performance of 
`sqlContext.read.parquet().count` is improved by 1.09x.
    
    Benchmark program:
    ```java
    val dir = "/dev/shm/parquet"
    val N = 1000 * 1000 * 40
    val iters = 20
    val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, 
warmupTime = 30.seconds)
    sparkSession.range(n).write.mode("overwrite").parquet(dir)
    
    benchmark.addCase("count") { i: Int =>
      var n = 0
      var len = 0L
      while (n < iters) {
        len += sparkSession.read.parquet(dir).count
        n += 1
      }
    }
    benchmark.run
    ```
    
    Performance result without this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
    Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    w/o this PR                                   1152 / 1211        694.7      
     1.4       1.0X
    ```
    
    Performance result with this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
    Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    with this PR                                  1053 / 1121        760.0      
     1.3       1.0X
    ```
    
    Here is a comparison between generated code w/o and with this PR. Only the 
method ```agg_doAggregateWithoutKey``` is changed.
    
    Generated code without this PR
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private boolean agg_initAgg;
    /* 009 */   private boolean agg_bufIsNull;
    /* 010 */   private long agg_bufValue;
    /* 011 */   private scala.collection.Iterator scan_input;
    /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
    /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_scanTime;
    /* 014 */   private long scan_scanTime1;
    /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
scan_batch;
    /* 016 */   private int scan_batchIdx;
    /* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_numOutputRows;
    /* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_aggTime;
    /* 019 */   private UnsafeRow agg_result;
    /* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 022 */
    /* 023 */   public GeneratedIterator(Object[] references) {
    /* 024 */     this.references = references;
    /* 025 */   }
    /* 026 */
    /* 027 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 028 */     partitionIndex = index;
    /* 029 */     this.inputs = inputs;
    /* 030 */     agg_initAgg = false;
    /* 031 */
    /* 032 */     scan_input = inputs[0];
    /* 033 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 034 */     this.scan_scanTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 035 */     scan_scanTime1 = 0;
    /* 036 */     scan_batch = null;
    /* 037 */     scan_batchIdx = 0;
    /* 038 */     this.agg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
    /* 039 */     this.agg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
    /* 040 */     agg_result = new UnsafeRow(1);
    /* 041 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 042 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
1);
    /* 043 */
    /* 044 */   }
    /* 045 */
    /* 046 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 047 */     // initialize aggregation buffer
    /* 048 */     agg_bufIsNull = false;
    /* 049 */     agg_bufValue = 0L;
    /* 050 */
    /* 051 */     if (scan_batch == null) {
    /* 052 */       scan_nextBatch();
    /* 053 */     }
    /* 054 */     while (scan_batch != null) {
    /* 055 */       int numRows = scan_batch.numRows();
    /* 056 */       while (scan_batchIdx < numRows) {
    /* 057 */         int scan_rowIdx = scan_batchIdx++;
    /* 058 */         // do aggregate
    /* 059 */         // common sub-expressions
    /* 060 */
    /* 061 */         // evaluate aggregate function
    /* 062 */         boolean agg_isNull1 = false;
    /* 063 */
    /* 064 */         long agg_value1 = -1L;
    /* 065 */         agg_value1 = agg_bufValue + 1L;
    /* 066 */         // update aggregation buffer
    /* 067 */         agg_bufIsNull = false;
    /* 068 */         agg_bufValue = agg_value1;
    /* 069 */         if (shouldStop()) return;
    /* 070 */       }
    /* 071 */       scan_batch = null;
    /* 072 */       scan_nextBatch();
    /* 073 */     }
    /* 074 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
    /* 075 */     scan_scanTime1 = 0;
    /* 076 */
    /* 077 */   }
    /* 078 */
    /* 079 */   private void scan_nextBatch() throws java.io.IOException {
    /* 080 */     long getBatchStart = System.nanoTime();
    /* 081 */     if (scan_input.hasNext()) {
    /* 082 */       scan_batch = 
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
    /* 083 */       scan_numOutputRows.add(scan_batch.numRows());
    /* 084 */       scan_batchIdx = 0;
    /* 085 */
    /* 086 */     }
    /* 087 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
    /* 088 */   }
    /* 089 */
    /* 090 */   protected void processNext() throws java.io.IOException {
    /* 091 */     while (!agg_initAgg) {
    /* 092 */       agg_initAgg = true;
    /* 093 */       long agg_beforeAgg = System.nanoTime();
    /* 094 */       agg_doAggregateWithoutKey();
    /* 095 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
1000000);
    /* 096 */
    /* 097 */       // output the result
    /* 098 */
    /* 099 */       agg_numOutputRows.add(1);
    /* 100 */       agg_rowWriter.zeroOutNullBytes();
    /* 101 */
    /* 102 */       if (agg_bufIsNull) {
    /* 103 */         agg_rowWriter.setNullAt(0);
    /* 104 */       } else {
    /* 105 */         agg_rowWriter.write(0, agg_bufValue);
    /* 106 */       }
    /* 107 */       append(agg_result);
    /* 108 */     }
    /* 109 */   }
    /* 110 */ }
    ```
    
    Generated code with this PR
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private boolean agg_initAgg;
    /* 009 */   private boolean agg_bufIsNull;
    /* 010 */   private long agg_bufValue;
    /* 011 */   private scala.collection.Iterator scan_input;
    /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
    /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_scanTime;
    /* 014 */   private long scan_scanTime1;
    /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
scan_batch;
    /* 016 */   private int scan_batchIdx;
    /* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_numOutputRows;
    /* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_aggTime;
    /* 019 */   private UnsafeRow agg_result;
    /* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 022 */
    /* 023 */   public GeneratedIterator(Object[] references) {
    /* 024 */     this.references = references;
    /* 025 */   }
    /* 026 */
    /* 027 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 028 */     partitionIndex = index;
    /* 029 */     this.inputs = inputs;
    /* 030 */     agg_initAgg = false;
    /* 031 */
    /* 032 */     scan_input = inputs[0];
    /* 033 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 034 */     this.scan_scanTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 035 */     scan_scanTime1 = 0;
    /* 036 */     scan_batch = null;
    /* 037 */     scan_batchIdx = 0;
    /* 038 */     this.agg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
    /* 039 */     this.agg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
    /* 040 */     agg_result = new UnsafeRow(1);
    /* 041 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 042 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
1);
    /* 043 */
    /* 044 */   }
    /* 045 */
    /* 046 */   private void agg_doAggregateWithoutKey() throws 
java.io.IOException {
    /* 047 */     // initialize aggregation buffer
    /* 048 */     agg_bufIsNull = false;
    /* 049 */     agg_bufValue = 0L;
    /* 050 */
    /* 051 */     if (scan_batch == null) {
    /* 052 */       scan_nextBatch();
    /* 053 */     }
    /* 054 */     while (scan_batch != null) {
    /* 055 */       int numRows = scan_batch.numRows();
    /* 056 */       int scan_localEnd = numRows - scan_batchIdx;
    /* 057 */       for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; 
scan_localIdx++) {
    /* 058 */         int scan_rowIdx = scan_batchIdx + scan_localIdx;
    /* 059 */         // do aggregate
    /* 060 */         // common sub-expressions
    /* 061 */
    /* 062 */         // evaluate aggregate function
    /* 063 */         boolean agg_isNull1 = false;
    /* 064 */
    /* 065 */         long agg_value1 = -1L;
    /* 066 */         agg_value1 = agg_bufValue + 1L;
    /* 067 */         // update aggregation buffer
    /* 068 */         agg_bufIsNull = false;
    /* 069 */         agg_bufValue = agg_value1;
    /* 070 */         // shouldStop check is eliminated
    /* 071 */       }
    /* 072 */       scan_batchIdx = numRows;
    /* 073 */       scan_batch = null;
    /* 074 */       scan_nextBatch();
    /* 075 */     }
    /* 079 */   }
    /* 080 */
    /* 081 */   private void scan_nextBatch() throws java.io.IOException {
    /* 082 */     long getBatchStart = System.nanoTime();
    /* 083 */     if (scan_input.hasNext()) {
    /* 084 */       scan_batch = 
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
    /* 085 */       scan_numOutputRows.add(scan_batch.numRows());
    /* 086 */       scan_batchIdx = 0;
    /* 087 */
    /* 088 */     }
    /* 089 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
    /* 090 */   }
    /* 091 */
    /* 092 */   protected void processNext() throws java.io.IOException {
    /* 093 */     while (!agg_initAgg) {
    /* 094 */       agg_initAgg = true;
    /* 095 */       long agg_beforeAgg = System.nanoTime();
    /* 096 */       agg_doAggregateWithoutKey();
    /* 097 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 
1000000);
    /* 098 */
    /* 099 */       // output the result
    /* 100 */
    /* 101 */       agg_numOutputRows.add(1);
    /* 102 */       agg_rowWriter.zeroOutNullBytes();
    /* 103 */
    /* 104 */       if (agg_bufIsNull) {
    /* 105 */         agg_rowWriter.setNullAt(0);
    /* 106 */       } else {
    /* 107 */         agg_rowWriter.write(0, agg_bufValue);
    /* 108 */       }
    /* 109 */       append(agg_result);
    /* 110 */     }
    /* 111 */   }
    /* 112 */ }
    ```
    
    ## How was this patch tested?
    
    Tested existing test suites

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-20046

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17378
    
----
commit d74b6cf5fb63479040e940e5797e0b226367b227
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Date:   2017-03-21T19:23:38Z

    initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to