Repository: spark
Updated Branches:
  refs/heads/master 2422c86f2 -> 93bb0b911


[SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding 
sqlContext.read.parquet()

## What changes were proposed in this pull request?

This PR improves performance of operations with `sqlContext.read.parquet()` by 
changing Java code generated by Catalyst. This PR is inspired by [the blog 
article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html)
 and [this stackoverflow 
entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).

This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local 
variables
2. Suppress generation of `shouldStop()` method if this method is unnecessary 
(e.g. `append()` is not generated).

These points facilitates compiler optimizations in a JIT compiler by feeding 
the simplified Java code into the JIT compiler. The performance of 
`sqlContext.read.parquet().count` is improved by 1.09x.

Benchmark program:
```java
val dir = "/dev/shm/parquet"
val N = 1000 * 1000 * 40
val iters = 20
val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime 
= 30.seconds)
sparkSession.range(n).write.mode("overwrite").parquet(dir)

benchmark.addCase("count") { i: Int =>
  var n = 0
  var len = 0L
  while (n < iters) {
    len += sparkSession.read.parquet(dir).count
    n += 1
  }
}
benchmark.run
```

Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o this PR                                   1152 / 1211        694.7          
 1.4       1.0X
```

Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
with this PR                                  1053 / 1121        760.0          
 1.3       1.0X
```

Here is a comparison between generated code w/o and with this PR. Only the 
method ```agg_doAggregateWithoutKey``` is changed.

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException 
{
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       while (scan_batchIdx < numRows) {
/* 057 */         int scan_rowIdx = scan_batchIdx++;
/* 058 */         // do aggregate
/* 059 */         // common sub-expressions
/* 060 */
/* 061 */         // evaluate aggregate function
/* 062 */         boolean agg_isNull1 = false;
/* 063 */
/* 064 */         long agg_value1 = -1L;
/* 065 */         agg_value1 = agg_bufValue + 1L;
/* 066 */         // update aggregation buffer
/* 067 */         agg_bufIsNull = false;
/* 068 */         agg_bufValue = agg_value1;
/* 069 */         if (shouldStop()) return;
/* 070 */       }
/* 071 */       scan_batch = null;
/* 072 */       scan_nextBatch();
/* 073 */     }
/* 074 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 075 */     scan_scanTime1 = 0;
/* 076 */
/* 077 */   }
/* 078 */
/* 079 */   private void scan_nextBatch() throws java.io.IOException {
/* 080 */     long getBatchStart = System.nanoTime();
/* 081 */     if (scan_input.hasNext()) {
/* 082 */       scan_batch = 
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 083 */       scan_numOutputRows.add(scan_batch.numRows());
/* 084 */       scan_batchIdx = 0;
/* 085 */
/* 086 */     }
/* 087 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 088 */   }
/* 089 */
/* 090 */   protected void processNext() throws java.io.IOException {
/* 091 */     while (!agg_initAgg) {
/* 092 */       agg_initAgg = true;
/* 093 */       long agg_beforeAgg = System.nanoTime();
/* 094 */       agg_doAggregateWithoutKey();
/* 095 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 096 */
/* 097 */       // output the result
/* 098 */
/* 099 */       agg_numOutputRows.add(1);
/* 100 */       agg_rowWriter.zeroOutNullBytes();
/* 101 */
/* 102 */       if (agg_bufIsNull) {
/* 103 */         agg_rowWriter.setNullAt(0);
/* 104 */       } else {
/* 105 */         agg_rowWriter.write(0, agg_bufValue);
/* 106 */       }
/* 107 */       append(agg_result);
/* 108 */     }
/* 109 */   }
/* 110 */ }
```

Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = 
(org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException 
{
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       int scan_localEnd = numRows - scan_batchIdx;
/* 057 */       for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; 
scan_localIdx++) {
/* 058 */         int scan_rowIdx = scan_batchIdx + scan_localIdx;
/* 059 */         // do aggregate
/* 060 */         // common sub-expressions
/* 061 */
/* 062 */         // evaluate aggregate function
/* 063 */         boolean agg_isNull1 = false;
/* 064 */
/* 065 */         long agg_value1 = -1L;
/* 066 */         agg_value1 = agg_bufValue + 1L;
/* 067 */         // update aggregation buffer
/* 068 */         agg_bufIsNull = false;
/* 069 */         agg_bufValue = agg_value1;
/* 070 */         // shouldStop check is eliminated
/* 071 */       }
/* 072 */       scan_batchIdx = numRows;
/* 073 */       scan_batch = null;
/* 074 */       scan_nextBatch();
/* 075 */     }
/* 079 */   }
/* 080 */
/* 081 */   private void scan_nextBatch() throws java.io.IOException {
/* 082 */     long getBatchStart = System.nanoTime();
/* 083 */     if (scan_input.hasNext()) {
/* 084 */       scan_batch = 
(org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 085 */       scan_numOutputRows.add(scan_batch.numRows());
/* 086 */       scan_batchIdx = 0;
/* 087 */
/* 088 */     }
/* 089 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */   }
/* 091 */
/* 092 */   protected void processNext() throws java.io.IOException {
/* 093 */     while (!agg_initAgg) {
/* 094 */       agg_initAgg = true;
/* 095 */       long agg_beforeAgg = System.nanoTime();
/* 096 */       agg_doAggregateWithoutKey();
/* 097 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 098 */
/* 099 */       // output the result
/* 100 */
/* 101 */       agg_numOutputRows.add(1);
/* 102 */       agg_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */       if (agg_bufIsNull) {
/* 105 */         agg_rowWriter.setNullAt(0);
/* 106 */       } else {
/* 107 */         agg_rowWriter.write(0, agg_bufValue);
/* 108 */       }
/* 109 */       append(agg_result);
/* 110 */     }
/* 111 */   }
/* 112 */ }
```

## How was this patch tested?

Tested existing test suites

Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>

Closes #17378 from kiszk/SPARK-20046.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93bb0b91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93bb0b91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93bb0b91

Branch: refs/heads/master
Commit: 93bb0b911b6c790fa369b39da51a83d8f62da909
Parents: 2422c86
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
Authored: Sun Mar 26 09:20:22 2017 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Sun Mar 26 09:20:22 2017 +0200

----------------------------------------------------------------------
 .../spark/sql/execution/ColumnarBatchScan.scala   | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93bb0b91/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 04fba17..e861166 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -111,17 +111,27 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
     val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
       genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
     }
+    val localIdx = ctx.freshName("localIdx")
+    val localEnd = ctx.freshName("localEnd")
+    val numRows = ctx.freshName("numRows")
+    val shouldStop = if (isShouldStopRequired) {
+      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
+    } else {
+      "// shouldStop check is eliminated"
+    }
     s"""
        |if ($batch == null) {
        |  $nextBatch();
        |}
        |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
+       |  int $numRows = $batch.numRows();
+       |  int $localEnd = $numRows - $idx;
+       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+       |    int $rowidx = $idx + $localIdx;
        |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
+       |    $shouldStop
        |  }
+       |  $idx = $numRows;
        |  $batch = null;
        |  $nextBatch();
        |}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to