GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/20637
Remove redundant null checks in generated Java code by GenerateUnsafeProjection ## What changes were proposed in this pull request? This PR works for one of TODOs in `GenerateUnsafeProjection` "if the nullability of field is correct, we can use it to save null check" to simplify generated code. When `nullable=false` in `DataType`, `GenerateUnsafeProjection` removed code for null checks in the generated Java code. The following is an example. Source code ``` val dataType3 = (new StructType) .add("a", StringType, nullable = false) .add("b", StringType, nullable = false) .add("c", StringType, nullable = false) val exprs3 = BoundReference(0, dataType3, nullable = false) :: Nil val projection3 = GenerateUnsafeProjection.generate(exprs3) projection3.apply(InternalRow(AlwaysNonNull)) ``` Generated code without this PR ``` /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificUnsafeProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1]; /* 009 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 010 */ private UnsafeRow[] mutableStateArray = new UnsafeRow[1]; /* 011 */ /* 012 */ public SpecificUnsafeProjection(Object[] references) { /* 013 */ this.references = references; /* 014 */ mutableStateArray[0] = new UnsafeRow(1); /* 015 */ mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray[0], 32); /* 016 */ mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0], 1); /* 017 */ mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0], 3); /* 018 */ /* 019 */ } /* 020 */ /* 021 */ public void initialize(int partitionIndex) { /* 022 */ /* 023 */ } /* 024 */ /* 025 */ // Scala.Function1 need this /* 026 */ public java.lang.Object apply(java.lang.Object row) { /* 027 */ return apply((InternalRow) row); /* 028 */ } /* 029 */ /* 030 */ public UnsafeRow apply(InternalRow i) { /* 031 */ mutableStateArray1[0].reset(); /* 032 */ /* 033 */ InternalRow value = i.getStruct(0, 3); /* 034 */ // Remember the current cursor so that we can calculate how many bytes are /* 035 */ // written later. /* 036 */ final int tmpCursor = mutableStateArray1[0].cursor; /* 037 */ /* 038 */ final InternalRow tmpInput = value; /* 039 */ if (tmpInput instanceof UnsafeRow) { /* 040 */ /* 041 */ final int sizeInBytes = ((UnsafeRow) tmpInput).getSizeInBytes(); /* 042 */ // grow the global buffer before writing data. /* 043 */ mutableStateArray1[0].grow(sizeInBytes); /* 044 */ ((UnsafeRow) tmpInput).writeToMemory(mutableStateArray1[0].buffer, mutableStateArray1[0].cursor); /* 045 */ mutableStateArray1[0].cursor += sizeInBytes; /* 046 */ /* 047 */ } else { /* 048 */ mutableStateArray2[1].reset(); /* 049 */ /* 050 */ /* 051 */ if (tmpInput.isNullAt(0)) { /* 052 */ mutableStateArray2[1].setNullAt(0); /* 053 */ } else { /* 054 */ mutableStateArray2[1].write(0, tmpInput.getUTF8String(0)); /* 055 */ } /* 056 */ /* 057 */ /* 058 */ if (tmpInput.isNullAt(1)) { /* 059 */ mutableStateArray2[1].setNullAt(1); /* 060 */ } else { /* 061 */ mutableStateArray2[1].write(1, tmpInput.getUTF8String(1)); /* 062 */ } /* 063 */ /* 064 */ /* 065 */ if (tmpInput.isNullAt(2)) { /* 066 */ mutableStateArray2[1].setNullAt(2); /* 067 */ } else { /* 068 */ mutableStateArray2[1].write(2, tmpInput.getUTF8String(2)); /* 069 */ } /* 070 */ } /* 071 */ /* 072 */ mutableStateArray2[0].setOffsetAndSize(0, tmpCursor, mutableStateArray1[0].cursor - tmpCursor); /* 073 */ mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize()); /* 074 */ return mutableStateArray[0]; /* 075 */ } /* 076 */ /* 077 */ /* 078 */ } ``` Generated code with this PR ``` /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificUnsafeProjection(references); /* 003 */ } /* 004 */ /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1]; /* 009 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 010 */ private UnsafeRow[] mutableStateArray = new UnsafeRow[1]; /* 011 */ /* 012 */ public SpecificUnsafeProjection(Object[] references) { /* 013 */ this.references = references; /* 014 */ mutableStateArray[0] = new UnsafeRow(1); /* 015 */ mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray[0], 32); /* 016 */ mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0], 1); /* 017 */ mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray1[0], 3); /* 018 */ /* 019 */ } /* 020 */ /* 021 */ public void initialize(int partitionIndex) { /* 022 */ /* 023 */ } /* 024 */ /* 025 */ // Scala.Function1 need this /* 026 */ public java.lang.Object apply(java.lang.Object row) { /* 027 */ return apply((InternalRow) row); /* 028 */ } /* 029 */ /* 030 */ public UnsafeRow apply(InternalRow i) { /* 031 */ mutableStateArray1[0].reset(); /* 032 */ /* 033 */ InternalRow value = i.getStruct(0, 3); /* 034 */ // Remember the current cursor so that we can calculate how many bytes are /* 035 */ // written later. /* 036 */ final int tmpCursor = mutableStateArray1[0].cursor; /* 037 */ /* 038 */ final InternalRow tmpInput = value; /* 039 */ if (tmpInput instanceof UnsafeRow) { /* 040 */ /* 041 */ final int sizeInBytes = ((UnsafeRow) tmpInput).getSizeInBytes(); /* 042 */ // grow the global buffer before writing data. /* 043 */ mutableStateArray1[0].grow(sizeInBytes); /* 044 */ ((UnsafeRow) tmpInput).writeToMemory(mutableStateArray1[0].buffer, mutableStateArray1[0].cursor); /* 045 */ mutableStateArray1[0].cursor += sizeInBytes; /* 046 */ /* 047 */ } else { /* 048 */ mutableStateArray2[1].reset(); /* 049 */ /* 050 */ /* 051 */ mutableStateArray2[1].write(0, tmpInput.getUTF8String(0)); /* 052 */ /* 053 */ /* 054 */ mutableStateArray2[1].write(1, tmpInput.getUTF8String(1)); /* 055 */ /* 056 */ /* 057 */ mutableStateArray2[1].write(2, tmpInput.getUTF8String(2)); /* 058 */ } /* 059 */ /* 060 */ mutableStateArray2[0].setOffsetAndSize(0, tmpCursor, mutableStateArray1[0].cursor - tmpCursor); /* 061 */ mutableStateArray[0].setTotalSize(mutableStateArray1[0].totalSize()); /* 062 */ return mutableStateArray[0]; /* 063 */ } /* 064 */ /* 065 */ /* 066 */ } ``` ## How was this patch tested? Added new test cases into `GenerateUnsafeProjectionSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-23466 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20637.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20637 ---- commit e2e9e3604284949d9d762274e2e1f55348851073 Author: Kazuaki Ishizaki <ishizaki@...> Date: 2018-02-19T19:31:36Z initial commit ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org