Repository: flink Updated Branches: refs/heads/release-1.4 471e067dc -> 963f14e26
[FLINK-9229] [table] Fix literal handling in code generation This closes #5898. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f14e2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f14e2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f14e2 Branch: refs/heads/release-1.4 Commit: 963f14e266e5347d98388552014c2be772d423bb Parents: 471e067 Author: Timo Walther <twal...@apache.org> Authored: Mon Apr 23 13:28:21 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Wed Apr 25 16:55:08 2018 +0200 ---------------------------------------------------------------------- .../flink/table/codegen/CodeGenerator.scala | 58 +++++++++++--------- .../codegen/calls/CurrentTimePointCallGen.scala | 10 ++-- .../codegen/calls/ScalarFunctionCallGen.scala | 2 +- .../table/runtime/batch/table/CalcITCase.scala | 2 +- 4 files changed, 40 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 609a4c5..ee7bdb6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -36,13 +36,13 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ -import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} -import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator} +import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NEVER_NULL, NO_CODE} import org.apache.flink.table.codegen.calls.ScalarOperators._ +import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator} import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions, StreamRecordTimestampSqlFunction} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.joda.time.format.DateTimeFormatter @@ -1104,7 +1104,7 @@ abstract class CodeGenerator( case ObjectFieldAccessor(field) => // primitive if (isFieldPrimitive(field)) { - generateNonNullLiteral(fieldType, s"$inputTerm.${field.getName}") + generateTerm(fieldType, s"$inputTerm.${field.getName}") } // Object else { @@ -1133,7 +1133,7 @@ abstract class CodeGenerator( val reflectiveAccessCode = reflectiveFieldReadAccess(fieldTerm, field, inputTerm) // primitive if (isFieldPrimitive(field)) { - generateNonNullLiteral(fieldType, reflectiveAccessCode) + generateTerm(fieldType, reflectiveAccessCode) } // Object else { @@ -1153,16 +1153,16 @@ abstract class CodeGenerator( private def generateNullLiteral(resultType: TypeInformation[_]): GeneratedExpression = { val resultTerm = newName("result") - val nullTerm = newName("isNull") val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) val defaultValue = primitiveDefaultValue(resultType) if (nullCheck) { val wrappedCode = s""" |$resultTypeTerm $resultTerm = $defaultValue; - |boolean $nullTerm = true; |""".stripMargin - GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, literal = true) + + // mark this expression as a constant literal + GeneratedExpression(resultTerm, ALWAYS_NULL, wrappedCode, resultType, literal = true) } else { throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.") } @@ -1172,33 +1172,41 @@ abstract class CodeGenerator( literalType: TypeInformation[_], literalCode: String) : GeneratedExpression = { - val resultTerm = newName("result") - val nullTerm = newName("isNull") - val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType) - - val resultCode = if (nullCheck) { - s""" - |$resultTypeTerm $resultTerm = $literalCode; - |boolean $nullTerm = false; - |""".stripMargin - } else { - s""" - |$resultTypeTerm $resultTerm = $literalCode; - |""".stripMargin - } - GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal = true) + // mark this expression as a constant literal + generateTerm(literalType, literalCode).copy(literal = true) } private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = { GeneratedExpression( qualifyEnum(enum), - "false", - "", + NEVER_NULL, + NO_CODE, new GenericTypeInfo(enum.getDeclaringClass)) } /** + * Generates access to a term (e.g. a field) that does not require unboxing logic. + * + * @param fieldType type of field + * @param fieldTerm expression term of field (already unboxed) + * @return internal unboxed field representation + */ + private[flink] def generateTerm( + fieldType: TypeInformation[_], + fieldTerm: String) + : GeneratedExpression = { + val resultTerm = newName("result") + val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType) + + val resultCode = s""" + |$resultTypeTerm $resultTerm = $fieldTerm; + |""".stripMargin + + GeneratedExpression(resultTerm, NEVER_NULL, resultCode, fieldType) + } + + /** * Converts the external boxed format to an internal mostly primitive field representation. * Wrapper types can autoboxed to their corresponding primitive type (Integer -> int). External * objects are converted to their internal representation (Timestamp -> internal timestamp http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala index d644847..3462368 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala @@ -36,23 +36,23 @@ class CurrentTimePointCallGen( : GeneratedExpression = targetType match { case SqlTimeTypeInfo.TIME if local => val time = codeGenerator.addReusableLocalTime() - codeGenerator.generateNonNullLiteral(targetType, time) + codeGenerator.generateTerm(targetType, time) case SqlTimeTypeInfo.TIMESTAMP if local => val timestamp = codeGenerator.addReusableLocalTimestamp() - codeGenerator.generateNonNullLiteral(targetType, timestamp) + codeGenerator.generateTerm(targetType, timestamp) case SqlTimeTypeInfo.DATE => val date = codeGenerator.addReusableDate() - codeGenerator.generateNonNullLiteral(targetType, date) + codeGenerator.generateTerm(targetType, date) case SqlTimeTypeInfo.TIME => val time = codeGenerator.addReusableTime() - codeGenerator.generateNonNullLiteral(targetType, time) + codeGenerator.generateTerm(targetType, time) case SqlTimeTypeInfo.TIMESTAMP => val timestamp = codeGenerator.addReusableTimestamp() - codeGenerator.generateNonNullLiteral(targetType, timestamp) + codeGenerator.generateTerm(targetType, timestamp) } } http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala index 6fad573..df206de 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala @@ -107,7 +107,7 @@ class ScalarFunctionCallGen( // convert result of function to internal representation (input unboxing) val resultUnboxing = if (resultClass.isPrimitive) { - codeGenerator.generateNonNullLiteral(returnType, resultTerm) + codeGenerator.generateTerm(returnType, resultTerm) } else { codeGenerator.generateInputFieldUnboxing(returnType, resultTerm) } http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala index 1a274d3..6195cbe 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala @@ -29,8 +29,8 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.Literal import org.apache.flink.table.expressions.utils._ import org.apache.flink.table.functions.ScalarFunction -import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils} import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils} import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.test.util.TestBaseUtils.compareResultAsText import org.apache.flink.types.Row