Repository: flink
Updated Branches:
  refs/heads/release-1.4 471e067dc -> 963f14e26


[FLINK-9229] [table] Fix literal handling in code generation

This closes #5898.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f14e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f14e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f14e2

Branch: refs/heads/release-1.4
Commit: 963f14e266e5347d98388552014c2be772d423bb
Parents: 471e067
Author: Timo Walther <twal...@apache.org>
Authored: Mon Apr 23 13:28:21 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Wed Apr 25 16:55:08 2018 +0200

----------------------------------------------------------------------
 .../flink/table/codegen/CodeGenerator.scala     | 58 +++++++++++---------
 .../codegen/calls/CurrentTimePointCallGen.scala | 10 ++--
 .../codegen/calls/ScalarFunctionCallGen.scala   |  2 +-
 .../table/runtime/batch/table/CalcITCase.scala  |  2 +-
 4 files changed, 40 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 609a4c5..ee7bdb6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -36,13 +36,13 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, 
FunctionGenerator}
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, 
NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
+import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, 
FunctionGenerator}
 import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, 
ScalarSqlFunctions, StreamRecordTimestampSqlFunction}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.joda.time.format.DateTimeFormatter
 
@@ -1104,7 +1104,7 @@ abstract class CodeGenerator(
           case ObjectFieldAccessor(field) =>
             // primitive
             if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, s"$inputTerm.${field.getName}")
+              generateTerm(fieldType, s"$inputTerm.${field.getName}")
             }
             // Object
             else {
@@ -1133,7 +1133,7 @@ abstract class CodeGenerator(
             val reflectiveAccessCode = reflectiveFieldReadAccess(fieldTerm, 
field, inputTerm)
             // primitive
             if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, reflectiveAccessCode)
+              generateTerm(fieldType, reflectiveAccessCode)
             }
             // Object
             else {
@@ -1153,16 +1153,16 @@ abstract class CodeGenerator(
 
   private def generateNullLiteral(resultType: TypeInformation[_]): 
GeneratedExpression = {
     val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
     val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
     val defaultValue = primitiveDefaultValue(resultType)
 
     if (nullCheck) {
       val wrappedCode = s"""
         |$resultTypeTerm $resultTerm = $defaultValue;
-        |boolean $nullTerm = true;
         |""".stripMargin
-      GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, 
literal = true)
+
+      // mark this expression as a constant literal
+      GeneratedExpression(resultTerm, ALWAYS_NULL, wrappedCode, resultType, 
literal = true)
     } else {
       throw new CodeGenException("Null literals are not allowed if nullCheck 
is disabled.")
     }
@@ -1172,33 +1172,41 @@ abstract class CodeGenerator(
       literalType: TypeInformation[_],
       literalCode: String)
     : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType)
-
-    val resultCode = if (nullCheck) {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    } else {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |""".stripMargin
-    }
 
-    GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal 
= true)
+    // mark this expression as a constant literal
+    generateTerm(literalType, literalCode).copy(literal = true)
   }
 
   private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
     GeneratedExpression(
       qualifyEnum(enum),
-      "false",
-      "",
+      NEVER_NULL,
+      NO_CODE,
       new GenericTypeInfo(enum.getDeclaringClass))
   }
 
   /**
+    * Generates access to a term (e.g. a field) that does not require unboxing 
logic.
+    *
+    * @param fieldType type of field
+    * @param fieldTerm expression term of field (already unboxed)
+    * @return internal unboxed field representation
+    */
+  private[flink] def generateTerm(
+      fieldType: TypeInformation[_],
+      fieldTerm: String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
+
+    val resultCode = s"""
+        |$resultTypeTerm $resultTerm = $fieldTerm;
+        |""".stripMargin
+
+    GeneratedExpression(resultTerm, NEVER_NULL, resultCode, fieldType)
+  }
+
+  /**
     * Converts the external boxed format to an internal mostly primitive field 
representation.
     * Wrapper types can autoboxed to their corresponding primitive type 
(Integer -> int). External
     * objects are converted to their internal representation (Timestamp -> 
internal timestamp

http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
index d644847..3462368 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
@@ -36,23 +36,23 @@ class CurrentTimePointCallGen(
     : GeneratedExpression = targetType match {
     case SqlTimeTypeInfo.TIME if local =>
       val time = codeGenerator.addReusableLocalTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
+      codeGenerator.generateTerm(targetType, time)
 
     case SqlTimeTypeInfo.TIMESTAMP if local =>
       val timestamp = codeGenerator.addReusableLocalTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+      codeGenerator.generateTerm(targetType, timestamp)
 
     case SqlTimeTypeInfo.DATE =>
       val date = codeGenerator.addReusableDate()
-      codeGenerator.generateNonNullLiteral(targetType, date)
+      codeGenerator.generateTerm(targetType, date)
 
     case SqlTimeTypeInfo.TIME =>
       val time = codeGenerator.addReusableTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
+      codeGenerator.generateTerm(targetType, time)
 
     case SqlTimeTypeInfo.TIMESTAMP =>
       val timestamp = codeGenerator.addReusableTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
+      codeGenerator.generateTerm(targetType, timestamp)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 6fad573..df206de 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -107,7 +107,7 @@ class ScalarFunctionCallGen(
 
     // convert result of function to internal representation (input unboxing)
     val resultUnboxing = if (resultClass.isPrimitive) {
-      codeGenerator.generateNonNullLiteral(returnType, resultTerm)
+      codeGenerator.generateTerm(returnType, resultTerm)
     } else {
       codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/963f14e2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 1a274d3..6195cbe 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -29,8 +29,8 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase, UserDefinedFunctionTestUtils}
 import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase, UserDefinedFunctionTestUtils}
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
 import org.apache.flink.types.Row

Reply via email to