[ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23676:
------------------------------------

    Assignee:     (was: Apache Spark)

> Support left join codegen in SortMergeJoinExec
> ----------------------------------------------
>
>                 Key: SPARK-23676
>                 URL: https://issues.apache.org/jira/browse/SPARK-23676
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: caoxuewen
>            Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> ------------------------------------------------------------------------------------------
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ smj_mutableStateArray2[0].setNullAt(3);
> /* 058 */ } else {
> /* 059 */ smj_mutableStateArray2[0].write(3, smj_value7);
> /* 060 */ }
> /* 061 */ append(smj_mutableStateArray[0].copy());
> /* 062 */
> /* 063 */ }
> /* 064 */
> /* 065 */ private boolean findNextJoinRows(
> /* 066 */ scala.collection.Iterator leftIter,
> /* 067 */ scala.collection.Iterator rightIter) {
> /* 068 */ smj_leftRow = null;
> /* 069 */ int comp = 0;
> /* 070 */ while (smj_leftRow == null) {
> /* 071 */ if (!leftIter.hasNext()) return false;
> /* 072 */ smj_leftRow = (InternalRow) leftIter.next();
> /* 073 */
> /* 074 */ long smj_value = smj_leftRow.getLong(0);
> /* 075 */ if (false) {
> /* 076 */ if (!smj_matches.isEmpty()) {
> /* 077 */ smj_matches.clear();
> /* 078 */ }
> /* 079 */ return true;
> /* 080 */ }
> /* 081 */ if (!smj_matches.isEmpty()) {
> /* 082 */ comp = 0;
> /* 083 */ if (comp == 0) {
> /* 084 */ comp = (smj_value > smj_value3 ? 1 : smj_value < smj_value3 ? -1 : 
> 0);
> /* 085 */ }
> /* 086 */
> /* 087 */ if (comp == 0) {
> /* 088 */ return true;
> /* 089 */ }
> /* 090 */ smj_matches.clear();
> /* 091 */ }
> /* 092 */
> /* 093 */ do {
> /* 094 */ if (smj_rightRow == null) {
> /* 095 */ if (!rightIter.hasNext()) {
> /* 096 */ smj_value3 = smj_value;
> /* 097 */ return true;
> /* 098 */ }
> /* 099 */ smj_rightRow = (InternalRow) rightIter.next();
> /* 100 */
> /* 101 */ long smj_value1 = smj_rightRow.getLong(0);
> /* 102 */ if (false) {
> /* 103 */ smj_rightRow = null;
> /* 104 */ continue;
> /* 105 */ }
> /* 106 */ smj_value2 = smj_value1;
> /* 107 */ }
> /* 108 */
> /* 109 */ comp = 0;
> /* 110 */ if (comp == 0) {
> /* 111 */ comp = (smj_value > smj_value2 ? 1 : smj_value < smj_value2 ? -1 : 
> 0);
> /* 112 */ }
> /* 113 */
> /* 114 */ if (comp > 0) {
> /* 115 */ smj_rightRow = null;
> /* 116 */ } else if (comp < 0) {
> /* 117 */ if (!smj_matches.isEmpty()) {
> /* 118 */ smj_value3 = smj_value;
> /* 119 */ }
> /* 120 */ return true;
> /* 121 */ } else {
> /* 122 */ smj_matches.add((UnsafeRow) smj_rightRow);
> /* 123 */ smj_rightRow = null;
> /* 124 */ }
> /* 125 */ } while (smj_leftRow != null);
> /* 126 */ }
> /* 127 */ return false; // unreachable
> /* 128 */ }
> /* 129 */
> /* 130 */ protected void processNext() throws java.io.IOException {
> /* 131 */ while (findNextJoinRows(smj_leftInput, smj_rightInput)) {
> /* 132 */ boolean smj_loaded = false;
> /* 133 */ smj_value4 = smj_leftRow.getLong(0);
> /* 134 */ smj_value5 = smj_leftRow.getLong(1);
> /* 135 */ scala.collection.Iterator<UnsafeRow> smj_iterator = 
> smj_matches.generateIterator();
> /* 136 */ while (smj_iterator.hasNext()) {
> /* 137 */ InternalRow smj_rightRow1 = (InternalRow) smj_iterator.next();
> /* 138 */ smj_isNull3 = smj_rightRow1.isNullAt(1);
> /* 139 */ smj_value7 = smj_rightRow1.getLong(1);
> /* 140 */ boolean smj_isNull4 = true;
> /* 141 */ boolean smj_value8 = false;
> /* 142 */
> /* 143 */ if (!smj_isNull3) {
> /* 144 */ smj_isNull4 = false; // resultCode could change nullability.
> /* 145 */ smj_value8 = smj_value5 < smj_value7;
> /* 146 */
> /* 147 */ }
> /* 148 */ if (smj_isNull4 || !smj_value8) continue;
> /* 149 */ smj_isNull2 = smj_rightRow1.isNullAt(0);
> /* 150 */ smj_value6 = smj_rightRow1.getLong(0);
> /* 151 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* 
> numOutputRows */).add(1);
> /* 152 */ smj_loaded = true;
> /* 153 */ writeJoinRows();
> /* 154 */ }
> /* 155 */ if (!smj_loaded) {
> /* 156 */ smj_isNull2 = true;
> /* 157 */ smj_isNull3 = true;
> /* 158 */ writeJoinRows();
> /* 159 */ }
> /* 160 */ if (shouldStop()) return;
> /* 161 */ }
> /* 162 */ }
> /* 163 */
> /* 164 */ }
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to