[jira] [Assigned] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23676:


Assignee: Apache Spark

> Support left join codegen in SortMergeJoinExec
> --
>
> Key: SPARK-23676
> URL: https://issues.apache.org/jira/browse/SPARK-23676
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: caoxuewen
>Assignee: Apache Spark
>Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> --
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ smj_mutableStateArray2[0].setNullAt(3);
> /* 058 */ } else {
> /* 059 */ smj_mutable

[jira] [Assigned] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23676:


Assignee: (was: Apache Spark)

> Support left join codegen in SortMergeJoinExec
> --
>
> Key: SPARK-23676
> URL: https://issues.apache.org/jira/browse/SPARK-23676
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: caoxuewen
>Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> --
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ smj_mutableStateArray2[0].setNullAt(3);
> /* 058 */ } else {
> /* 059 */ smj_mutableStateArray2[0].write(3, s