This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec3b36c  [FLINK-11830][table-planner-blink] Introduce 
CodeGeneratorContext to maintain reusable statements
ec3b36c is described below

commit ec3b36c4f0941ded4cb8768897269d9769f80532
Author: Jark Wu <imj...@gmail.com>
AuthorDate: Wed Mar 6 11:17:12 2019 +0800

    [FLINK-11830][table-planner-blink] Introduce CodeGeneratorContext to 
maintain reusable statements
    
    This closes #7906
---
 .../org/apache/flink/table/api/TableConfig.scala   | 111 ++++
 .../flink/table/codegen/CodeGenException.scala     |  24 +
 .../apache/flink/table/codegen/CodeGenUtils.scala  | 232 +++++++
 .../flink/table/codegen/CodeGeneratorContext.scala | 688 +++++++++++++++++++++
 .../flink/table/codegen/GeneratedExpression.scala  |  45 ++
 .../org/apache/flink/table/codegen/Indenter.scala  |  58 ++
 .../flink/table/typeutils/TypeCheckUtils.scala     |  54 ++
 .../apache/flink/table/generated/CompileUtils.java |  90 +++
 .../flink/table/generated/GeneratedClass.java      |  66 ++
 .../flink/table/generated/GeneratedCollector.java  |  42 ++
 .../flink/table/generated/GeneratedFunction.java   |  42 ++
 .../flink/table/generated/GeneratedInput.java      |  42 ++
 .../flink/table/generated/CompileUtilsTest.java    |  82 +++
 13 files changed, 1576 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
new file mode 100644
index 0000000..2e8bc8a
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import _root_.java.util.TimeZone
+import _root_.java.math.MathContext
+
+/**
+ * A config to define the runtime behavior of the Table API.
+ */
+class TableConfig {
+
+  /**
+   * Defines the timezone for date/time/timestamp conversions.
+   */
+  private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
+
+  /**
+   * Defines if all fields need to be checked for NULL first.
+   */
+  private var nullCheck: Boolean = true
+
+  /**
+    * Defines the default context for decimal division calculation.
+    * We use Scala's default MathContext.DECIMAL128.
+    */
+  private var decimalContext: MathContext = MathContext.DECIMAL128
+
+  /**
+    * Specifies a threshold where generated code will be split into 
sub-function calls. Java has a
+    * maximum method length of 64 KB. This setting allows for finer 
granularity if necessary.
+    */
+  private var maxGeneratedCodeLength: Int = 64000 // just an estimate
+
+  /**
+   * Sets the timezone for date/time/timestamp conversions.
+   */
+  def setTimeZone(timeZone: TimeZone): Unit = {
+    require(timeZone != null, "timeZone must not be null.")
+    this.timeZone = timeZone
+  }
+
+  /**
+   * Returns the timezone for date/time/timestamp conversions.
+   */
+  def getTimeZone: TimeZone = timeZone
+
+  /**
+   * Returns the NULL check. If enabled, all fields need to be checked for 
NULL first.
+   */
+  def getNullCheck: Boolean = nullCheck
+
+  /**
+   * Sets the NULL check. If enabled, all fields need to be checked for NULL 
first.
+   */
+  def setNullCheck(nullCheck: Boolean): Unit = {
+    this.nullCheck = nullCheck
+  }
+
+  /**
+    * Returns the default context for decimal division calculation.
+    * [[_root_.java.math.MathContext#DECIMAL128]] by default.
+    */
+  def getDecimalContext: MathContext = decimalContext
+
+  /**
+    * Sets the default context for decimal division calculation.
+    * [[_root_.java.math.MathContext#DECIMAL128]] by default.
+    */
+  def setDecimalContext(mathContext: MathContext): Unit = {
+    this.decimalContext = mathContext
+  }
+
+  /**
+    * Returns the current threshold where generated code will be split into 
sub-function calls.
+    * Java has a maximum method length of 64 KB. This setting allows for finer 
granularity if
+    * necessary. Default is 64000.
+    */
+  def getMaxGeneratedCodeLength: Int = maxGeneratedCodeLength
+
+  /**
+    * Returns the current threshold where generated code will be split into 
sub-function calls.
+    * Java has a maximum method length of 64 KB. This setting allows for finer 
granularity if
+    * necessary. Default is 64000.
+    */
+  def setMaxGeneratedCodeLength(maxGeneratedCodeLength: Int): Unit = {
+    if (maxGeneratedCodeLength <= 0) {
+      throw new IllegalArgumentException("Length must be greater than 0.")
+    }
+    this.maxGeneratedCodeLength = maxGeneratedCodeLength
+  }
+}
+
+object TableConfig {
+  def DEFAULT = new TableConfig()
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
new file mode 100644
index 0000000..1f2e9a9
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+/**
+  * Exception for all errors occurring during code generation.
+  */
+class CodeGenException(msg: String) extends RuntimeException(msg)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
new file mode 100644
index 0000000..266f9fe
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.`type`._
+import org.apache.flink.table.dataformat._
+import org.apache.flink.table.typeutils.TypeCheckUtils
+
+import java.lang.reflect.Method
+import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => 
JShort}
+import java.util.concurrent.atomic.AtomicInteger
+
+object CodeGenUtils {
+
+  // ------------------------------- DEFAULT TERMS 
------------------------------------------
+
+  val DEFAULT_TIMEZONE_TERM = "timeZone"
+
+  // -------------------------- CANONICAL CLASS NAMES 
---------------------------------------
+
+  val BINARY_ROW: String = className[BinaryRow]
+  val BINARY_STRING: String = className[BinaryString]
+  val BASE_ROW: String = className[BaseRow]
+  val GENERIC_ROW: String = className[GenericRow]
+
+  // 
----------------------------------------------------------------------------------------
+
+  private val nameCounter = new AtomicInteger
+
+  def newName(name: String): String = {
+    s"$name$$${nameCounter.getAndIncrement}"
+  }
+
+  def newNames(names: String*): Seq[String] = {
+    require(names.toSet.size == names.length, "Duplicated names")
+    val newId = nameCounter.getAndIncrement
+    names.map(name => s"$name$$$newId")
+  }
+
+  /**
+    * Retrieve the canonical name of a class type.
+    */
+  def className[T](implicit m: Manifest[T]): String = 
m.runtimeClass.getCanonicalName
+
+  def needCopyForType(t: InternalType): Boolean = t match {
+    case InternalTypes.STRING => true
+    case _: ArrayType => true
+    case _: MapType => true
+    case _: RowType => true
+    case _: GenericType[_] => true
+    case _ => false
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  def primitiveTypeTermForType(t: InternalType): String = t match {
+    case InternalTypes.INT => "int"
+    case InternalTypes.LONG => "long"
+    case InternalTypes.SHORT => "short"
+    case InternalTypes.BYTE => "byte"
+    case InternalTypes.FLOAT => "float"
+    case InternalTypes.DOUBLE => "double"
+    case InternalTypes.BOOLEAN => "boolean"
+    case InternalTypes.CHAR => "char"
+
+    case InternalTypes.DATE => "int"
+    case InternalTypes.TIME => "int"
+    case InternalTypes.TIMESTAMP => "long"
+
+    // TODO: support [INTERVAL_MONTHS] and [INTERVAL_MILLIS] in the future
+
+    case _ => boxedTypeTermForType(t)
+  }
+
+  def boxedTypeTermForType(t: InternalType): String = t match {
+    case InternalTypes.INT => className[JInt]
+    case InternalTypes.LONG => className[JLong]
+    case InternalTypes.SHORT => className[JShort]
+    case InternalTypes.BYTE => className[JByte]
+    case InternalTypes.FLOAT => className[JFloat]
+    case InternalTypes.DOUBLE => className[JDouble]
+    case InternalTypes.BOOLEAN => className[JBoolean]
+    case InternalTypes.CHAR => className[JChar]
+
+    case InternalTypes.DATE => boxedTypeTermForType(InternalTypes.INT)
+    case InternalTypes.TIME => boxedTypeTermForType(InternalTypes.INT)
+    case InternalTypes.TIMESTAMP => boxedTypeTermForType(InternalTypes.LONG)
+
+    case InternalTypes.STRING => BINARY_STRING
+
+    // TODO: Support it when we introduce [Decimal]
+    case _: DecimalType => throw new UnsupportedOperationException
+    // BINARY is also an ArrayType and uses BinaryArray internally too
+    case _: ArrayType => className[BinaryArray]
+    case _: MapType => className[BinaryMap]
+    case _: RowType => className[BaseRow]
+
+    case gt: GenericType[_] => gt.getTypeInfo.getTypeClass.getCanonicalName
+  }
+
+  /**
+    * Gets the boxed type term from external type info.
+    * We only use TypeInformation to store external type info.
+    */
+  def boxedTypeTermForExternalType(t: TypeInformation[_]): String = 
t.getTypeClass.getCanonicalName
+
+  /**
+    * Gets the default value for a primitive type, and null for generic types
+    */
+  def primitiveDefaultValue(t: InternalType): String = t match {
+    case InternalTypes.INT | InternalTypes.BYTE | InternalTypes.SHORT => "-1"
+    case InternalTypes.LONG => "-1L"
+    case InternalTypes.FLOAT => "-1.0f"
+    case InternalTypes.DOUBLE => "-1.0d"
+    case InternalTypes.BOOLEAN => "false"
+    case InternalTypes.STRING => s"$BINARY_STRING.EMPTY_UTF8"
+    case InternalTypes.CHAR => "'\\0'"
+
+    case InternalTypes.DATE | InternalTypes.TIME => "-1"
+    case InternalTypes.TIMESTAMP => "-1L"
+
+    case _ => "null"
+  }
+
+  // -------------------------- Method & Enum 
---------------------------------------
+
+  def qualifyMethod(method: Method): String =
+    method.getDeclaringClass.getCanonicalName + "." + method.getName
+
+  def qualifyEnum(enum: Enum[_]): String =
+    enum.getClass.getCanonicalName + "." + enum.name()
+
+  def compareEnum(term: String, enum: Enum[_]): Boolean = term == 
qualifyEnum(enum)
+
+  def getEnum(genExpr: GeneratedExpression): Enum[_] = {
+    val split = genExpr.resultTerm.split('.')
+    val value = split.last
+    val clazz = genExpr.resultType.asInstanceOf[GenericType[_]].getTypeClass
+    enumValueOf(clazz, value)
+  }
+
+  def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
+    Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
+
+  // --------------------------- Require Check 
---------------------------------------
+
+  def requireNumeric(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
+      throw new CodeGenException("Numeric expression type expected, but was " +
+        s"'${genExpr.resultType}'.")
+    }
+
+  def requireComparable(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isComparable(genExpr.resultType)) {
+      throw new CodeGenException(s"Comparable type expected, but was 
'${genExpr.resultType}'.")
+    }
+
+  def requireString(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isString(genExpr.resultType)) {
+      throw new CodeGenException("String expression type expected.")
+    }
+
+  def requireBoolean(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
+      throw new CodeGenException("Boolean expression type expected.")
+    }
+
+  def requireArray(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isArray(genExpr.resultType)) {
+      throw new CodeGenException("Array expression type expected.")
+    }
+
+  def requireMap(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isMap(genExpr.resultType)) {
+      throw new CodeGenException("Map expression type expected.")
+    }
+
+  def requireInteger(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
+      throw new CodeGenException("Integer expression type expected.")
+    }
+
+  // --------------------------- Generate Utils 
---------------------------------------
+
+  def generateOutputRecordStatement(
+      t: InternalType,
+      clazz: Class[_],
+      outRecordTerm: String,
+      outRecordWriterTerm: Option[String] = None): String = {
+    t match {
+      case rt: RowType if clazz == classOf[BinaryRow] =>
+        val writerTerm = outRecordWriterTerm.getOrElse(
+          throw new CodeGenException("No writer is specified when writing 
BinaryRow record.")
+        )
+        val binaryRowWriter = className[BinaryRowWriter]
+        val typeTerm = clazz.getCanonicalName
+        s"""
+           |final $typeTerm $outRecordTerm = new $typeTerm(${rt.getArity});
+           |final $binaryRowWriter $writerTerm = new 
$binaryRowWriter($outRecordTerm);
+           |""".stripMargin.trim
+      case rt: RowType if classOf[ObjectArrayRow].isAssignableFrom(clazz) =>
+        val typeTerm = clazz.getCanonicalName
+        s"final $typeTerm $outRecordTerm = new $typeTerm(${rt.getArity});"
+      // TODO: support [JoinedRow] in the future
+      case _ =>
+        val typeTerm = boxedTypeTermForType(t)
+        s"final $typeTerm $outRecordTerm = new $typeTerm();"
+    }
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
new file mode 100644
index 0000000..98f1eb8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.flink.api.common.functions.{Function, RuntimeContext}
+import org.apache.flink.table.`type`.{InternalType, InternalTypes, RowType}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.dataformat.GenericRow
+import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
+import org.apache.flink.util.InstantiationUtil
+
+import scala.collection.mutable
+
+/**
+  * The context for code generator, maintaining various reusable statements 
that could be insert
+  * into different code sections in the final generated class.
+  */
+class CodeGeneratorContext(val tableConfig: TableConfig) {
+
+  // holding a list of objects that could be used passed into generated class
+  val references: mutable.ArrayBuffer[AnyRef] = new 
mutable.ArrayBuffer[AnyRef]()
+
+  // set of member statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableMemberStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // set of constructor statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableInitStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // set of open statements for RichFunction that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableOpenStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // set of close statements for RichFunction that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCloseStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // set of endInput statements for StreamOperator that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableEndInputStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // set of statements for cleanup dataview that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
+
+  // set of statements that will be added only once per record;
+  // code should only update member variables because local variables are not 
accessible if
+  // the code needs to be split;
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
+    mutable.LinkedHashSet[String]()
+
+  // map of initial input unboxing expressions that will be added only once
+  // (inputTerm, index) -> expr
+  private val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
+    mutable.Map[(String, Int), GeneratedExpression]()
+
+  // set of constructor statements that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableConstructorStatements: mutable.LinkedHashSet[(String, 
String)] =
+    mutable.LinkedHashSet[(String, String)]()
+
+  // set of inner class definition statements that will be added only once
+  private val reusableInnerClassDefinitionStatements: mutable.Map[String, 
String] =
+    mutable.Map[String, String]()
+
+  // map of string constants that will be added only once
+  // string_constant -> reused_term
+  private val reusableStringConstants: mutable.Map[String, String] = 
mutable.Map[String,  String]()
+
+  // map of local variable statements. It will be placed in method if method 
code not excess
+  // max code length, otherwise will be placed in member area of the class. 
The statements
+  // are maintained for multiple methods, so that it's a map from method_name 
to variables.
+  //
+  // method_name -> local_variable_statements
+  private val reusableLocalVariableStatements = mutable.Map[String, 
mutable.LinkedHashSet[String]]()
+
+  /**
+    * The current method name for [[reusableLocalVariableStatements]]. You can 
start a new
+    * local variable statements for another method using 
[[startNewLocalVariableStatement()]]
+    */
+  private var currentMethodNameForLocalVariables = "DEFAULT"
+
+  // 
---------------------------------------------------------------------------------
+  // Getter
+  // 
---------------------------------------------------------------------------------
+
+  def getReusableInputUnboxingExprs(inputTerm: String, index: Int): 
Option[GeneratedExpression] =
+    reusableInputUnboxingExprs.get((inputTerm, index))
+
+  def getNullCheck: Boolean = tableConfig.getNullCheck
+
+  // 
---------------------------------------------------------------------------------
+  // Local Variables for Code Split
+  // 
---------------------------------------------------------------------------------
+
+  /**
+    * Starts a new local variable statements for a generated class with the 
given method name.
+    * @param methodName the method name which the fields will be placed into 
if code is not split.
+    */
+  def startNewLocalVariableStatement(methodName: String): Unit = {
+    currentMethodNameForLocalVariables = methodName
+    reusableLocalVariableStatements(methodName) = 
mutable.LinkedHashSet[String]()
+  }
+
+
+  /**
+    * Adds a reusable local variable statement with the given type term and 
field name.
+    * The local variable statements will be placed in methods or class member 
area depends
+    * on whether the method length excess max code length.
+    *
+    * @param fieldName  the field name prefix
+    * @param fieldTypeTerm  the field type term
+    * @return a new generated unique field name
+    */
+  def newReusableLocalVariable(fieldTypeTerm: String, fieldName: String): 
String = {
+    val fieldTerm = newName(fieldName)
+    reusableLocalVariableStatements
+    .getOrElse(currentMethodNameForLocalVariables, 
mutable.LinkedHashSet[String]())
+    .add(s"$fieldTypeTerm $fieldTerm;")
+    fieldTerm
+  }
+
+  /**
+    * Adds multiple pairs of local variables.
+    * The local variable statements will be placed in methods or class
+    * member area depends on whether the method length excess max code length.
+    *
+    * @param fieldTypeAndNames pairs of local variables with
+    *                          left is field type term and right is field name
+    * @return the new generated unique field names for each variable pairs
+    */
+  def newReusableLocalFields(fieldTypeAndNames: (String, String)*): 
Seq[String] = {
+    val fieldTerms = newNames(fieldTypeAndNames.map(_._2): _*)
+    fieldTypeAndNames.map(_._1).zip(fieldTerms).foreach { case (fieldTypeTerm, 
fieldTerm) =>
+      reusableLocalVariableStatements
+      .getOrElse(currentMethodNameForLocalVariables, 
mutable.LinkedHashSet[String]())
+      .add(s"$fieldTypeTerm $fieldTerm;")
+    }
+    fieldTerms
+  }
+
+  // 
---------------------------------------------------------------------------------
+  // generate reuse code methods
+  // 
---------------------------------------------------------------------------------
+
+  /**
+    * @return code block of statements that need to be placed in the member 
area of the class
+    *         (e.g. inner class definition)
+    */
+  def reuseInnerClassDefinitionCode(): String = {
+    reusableInnerClassDefinitionStatements.values.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the member 
area of the class
+    *         (e.g. member variables and their initialization)
+    */
+  def reuseMemberCode(): String = {
+    reusableMemberStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that will be placed in the member area 
of the class
+    *         if generated code is split or in local variables of method
+    */
+  def reuseLocalVariableCode(methodName: String = null): String = {
+    if (methodName == null) {
+      
reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n")
+    } else {
+      reusableLocalVariableStatements(methodName).mkString("\n")
+    }
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the 
constructor
+    */
+  def reuseInitCode(): String = {
+    reusableInitStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the per 
recode process block
+    *         (e.g. Function or StreamOperator's processElement)
+    */
+  def reusePerRecordCode(): String = {
+    reusablePerRecordStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the open() 
method
+    *         (e.g. RichFunction or StreamOperator)
+    */
+  def reuseOpenCode(): String = {
+    reusableOpenStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the close() 
method
+    *         (e.g. RichFunction or StreamOperator)
+    */
+  def reuseCloseCode(): String = {
+    reusableCloseStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the 
endInput() method
+    *         (StreamOperator)
+    */
+  def reuseEndInputCode(): String = {
+    reusableEndInputStatements.mkString("\n")
+  }
+
+  /**
+    * @return code block of statements that need to be placed in the cleanup() 
method of
+    *         [AggregationsFunction]
+    */
+  def reuseCleanupCode(): String = {
+    reusableCleanupStatements.mkString("", "\n", "\n")
+  }
+
+  /**
+    * @return code block of statements that unbox input variables to a 
primitive variable
+    *         and a corresponding null flag variable
+    */
+  def reuseInputUnboxingCode(): String = {
+    reusableInputUnboxingExprs.values.map(_.code).mkString("\n")
+  }
+
+  /**
+    * Returns code block of unboxing input variables which belongs to the 
given inputTerm.
+    */
+  def reuseInputUnboxingCode(inputTerm: String): String = {
+    val exprs = reusableInputUnboxingExprs.filter { case ((term, _), _) =>
+      inputTerm.equals(term)
+    }
+    val codes = for (((_, _), expr) <- exprs) yield expr.code
+    codes.mkString("\n").trim
+  }
+
+  /**
+    * @return code block of constructor statements
+    */
+  def reuseConstructorCode(className: String): String = {
+    reusableConstructorStatements.map { case (params, body) =>
+      s"""
+         |public $className($params) throws Exception {
+         |  this();
+         |  $body
+         |}
+         |""".stripMargin
+    }.mkString("\n")
+  }
+
+  // 
---------------------------------------------------------------------------------
+  // add reusable code blocks
+  // 
---------------------------------------------------------------------------------
+
+  /**
+    * Adds a reusable inner class statement with the given class name and 
class code
+    */
+  def addReusableInnerClass(className: String, statements: String): Unit = {
+    reusableInnerClassDefinitionStatements(className) = statements
+  }
+
+  /**
+    * Adds a reusable member field statement to the member area.
+    *
+    * @param memberStatement the member field declare statement
+    */
+  def addReusableMember(memberStatement: String): Unit = {
+    reusableMemberStatements.add(memberStatement)
+  }
+
+  /**
+    * Adds a reusable per record statement
+    */
+  def addReusablePerRecordStatement(s: String): Unit = 
reusablePerRecordStatements.add(s)
+
+  /**
+    * Adds a reusable open statement
+    */
+  def addReusableOpenStatement(s: String): Unit = reusableOpenStatements.add(s)
+
+  /**
+    * Adds a reusable close statement
+    */
+  def addReusableCloseStatement(s: String): Unit = 
reusableCloseStatements.add(s)
+
+  /**
+    * Adds a reusable endInput statement
+    */
+  def addReusableEndInputStatement(s: String): Unit = 
reusableEndInputStatements.add(s)
+
+  /**
+    * Adds a reusable cleanup statement
+    */
+  def addReusableCleanupStatement(s: String): Unit = 
reusableCleanupStatements.add(s)
+
+
+  /**
+    * Adds a reusable input unboxing expression
+    */
+  def addReusableInputUnboxingExprs(
+    inputTerm: String,
+    index: Int,
+    expr: GeneratedExpression): Unit = reusableInputUnboxingExprs((inputTerm, 
index)) = expr
+
+  /**
+    * Adds a reusable output record statement to member area.
+    */
+  def addReusableOutputRecord(
+      t: InternalType,
+      clazz: Class[_],
+      outRecordTerm: String,
+      outRecordWriterTerm: Option[String] = None): Unit = {
+    val statement = generateOutputRecordStatement(t, clazz, outRecordTerm, 
outRecordWriterTerm)
+    reusableMemberStatements.add(statement)
+  }
+
+  /**
+    * Adds a reusable null [[org.apache.flink.table.dataformat.GenericRow]] to 
the member area.
+    */
+  def addReusableNullRow(rowTerm: String, arity: Int): Unit = {
+    addReusableOutputRecord(
+      new RowType((0 until arity).map(_ => InternalTypes.INT): _*),
+      classOf[GenericRow],
+      rowTerm)
+  }
+
+  /**
+    * Adds a reusable timestamp to the beginning of the SAM of the generated 
class.
+    */
+  def addReusableTimestamp(): String = {
+    val fieldTerm = s"timestamp"
+    val field =
+      s"""
+         |final long $fieldTerm = java.lang.System.currentTimeMillis();
+         |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable local timestamp to the beginning of the SAM of the 
generated class.
+    */
+  def addReusableLocalTimestamp(): String = {
+    addReusableTimestamp()
+  }
+
+  /**
+    * Adds a reusable time to the beginning of the SAM of the generated class.
+    */
+  def addReusableTime(): String = {
+    val fieldTerm = s"time"
+    val timestamp = addReusableTimestamp()
+    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
+    val field =
+      s"""
+         |final int $fieldTerm = (int) ($timestamp % 
${DateTimeUtils.MILLIS_PER_DAY});
+         |if (time < 0) {
+         |  time += ${DateTimeUtils.MILLIS_PER_DAY};
+         |}
+         |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable local time to the beginning of the SAM of the generated 
class.
+    */
+  def addReusableLocalTime(): String = {
+    val fieldTerm = s"localtime"
+    val timeZone = addReusableTimeZone()
+    val localtimestamp = addReusableLocalTimestamp()
+    // adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
+    val field =
+      s"""
+         |final int $fieldTerm = (int) ( ($localtimestamp + 
$timeZone.getOffset($localtimestamp))
+         |                              % ${DateTimeUtils.MILLIS_PER_DAY});
+         |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable date to the beginning of the SAM of the generated class.
+    */
+  def addReusableDate(): String = {
+    val fieldTerm = s"date"
+    val timestamp = addReusableTimestamp()
+    val time = addReusableTime()
+    val timeZone = addReusableTimeZone()
+
+    // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
+    val field =
+      s"""
+         |final int $fieldTerm = (int) (($timestamp + 
$timeZone.getOffset($timestamp))
+         |                              / ${DateTimeUtils.MILLIS_PER_DAY});
+         |if ($time < 0) {
+         |  $fieldTerm -= 1;
+         |}
+         |""".stripMargin
+    reusablePerRecordStatements.add(field)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable TimeZone to the member area of the generated class.
+    */
+  def addReusableTimeZone(): String = {
+    val zoneID = tableConfig.getTimeZone.getID
+    val stmt =
+      s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM =
+         |                 
java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin
+    addReusableMember(stmt)
+    DEFAULT_TIMEZONE_TERM
+  }
+
+
+  /**
+    * Adds a reusable [[java.util.Random]] to the member area of the generated 
class.
+    *
+    * The seed parameter must be a literal/constant expression.
+    *
+    * @return member variable term
+    */
+  def addReusableRandom(seedExpr: Option[GeneratedExpression]): String = {
+    val fieldTerm = newName("random")
+
+    val field =
+      s"""
+         |final java.util.Random $fieldTerm;
+         |""".stripMargin
+
+    val fieldInit = seedExpr match {
+      case Some(s) if getNullCheck =>
+        s"""
+           |${s.code}
+           |if (!${s.nullTerm}) {
+           |  $fieldTerm = new java.util.Random(${s.resultTerm});
+           |}
+           |else {
+           |  $fieldTerm = new java.util.Random();
+           |}
+           |""".stripMargin
+      case Some(s) =>
+        s"""
+           |${s.code}
+           |$fieldTerm = new java.util.Random(${s.resultTerm});
+           |""".stripMargin
+      case _ =>
+        s"""
+           |$fieldTerm = new java.util.Random();
+           |""".stripMargin
+    }
+
+    reusableMemberStatements.add(field)
+    reusableInitStatements.add(fieldInit)
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable Object to the member area of the generated class
+    * @param obj  the object to be added to the generated class
+    * @param fieldNamePrefix  prefix field name of the generated member field 
term
+    * @param fieldTypeTerm  field type class name
+    * @return the generated unique field term
+    */
+  def addReusableObject(
+      obj: AnyRef,
+      fieldNamePrefix: String,
+      fieldTypeTerm: String = null): String = {
+    val fieldTerm = newName(fieldNamePrefix)
+    val clsName = 
Option(fieldTypeTerm).getOrElse(obj.getClass.getCanonicalName)
+    addReusableObjectInternal(obj, fieldTerm, clsName)
+    fieldTerm
+  }
+
+  private def addReusableObjectInternal(
+      obj: AnyRef,
+      fieldTerm: String,
+      fieldTypeTerm: String): Unit = {
+    val idx = references.length
+    // make a deep copy of the object
+    val byteArray = InstantiationUtil.serializeObject(obj)
+    val objCopy: AnyRef = InstantiationUtil.deserializeObject(
+      byteArray,
+      obj.getClass.getClassLoader)
+    references += objCopy
+
+    reusableMemberStatements.add(s"private transient $fieldTypeTerm 
$fieldTerm;")
+    reusableInitStatements.add(s"$fieldTerm = ((($fieldTypeTerm) 
references[$idx]));")
+  }
+
+  /**
+    * Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
+    *
+    * @param function [[UserDefinedFunction]] object to be instantiated during 
runtime
+    * @param contextTerm [[RuntimeContext]] term to access the 
[[RuntimeContext]]
+    * @return member variable term
+    */
+  def addReusableFunction(function: UserDefinedFunction, contextTerm: String = 
null): String = {
+    val classQualifier = function.getClass.getCanonicalName
+    val fieldTerm = s"function_${function.functionIdentifier}"
+
+    addReusableObjectInternal(function, fieldTerm, classQualifier)
+
+    val openFunction = if (contextTerm != null) {
+      s"""
+         |$fieldTerm.open(new 
${classOf[FunctionContext].getCanonicalName}($contextTerm));
+       """.stripMargin
+    } else {
+      s"""
+         |$fieldTerm.open(new 
${classOf[FunctionContext].getCanonicalName}(getRuntimeContext()));
+       """.stripMargin
+    }
+    reusableOpenStatements.add(openFunction)
+
+    val closeFunction =
+      s"""
+         |$fieldTerm.close();
+       """.stripMargin
+    reusableCloseStatements.add(closeFunction)
+
+    fieldTerm
+  }
+
+  /**
+    * Adds a reusable static SLF4J Logger to the member area of the generated 
class.
+    */
+  def addReusableLogger(logTerm: String, clazzTerm: String): Unit = {
+    val stmt =
+      s"""
+         |private static final org.slf4j.Logger $logTerm =
+         |  org.slf4j.LoggerFactory.getLogger("$clazzTerm");
+         |""".stripMargin
+    reusableMemberStatements.add(stmt)
+  }
+
+  /**
+    * Adds a reusable constant to the member area of the generated class.
+    *
+    * @param constant constant expression
+    * @return generated expression with the fieldTerm and nullTerm
+    */
+  def addReusableConstant(
+      constant: GeneratedExpression,
+      nullCheck: Boolean): GeneratedExpression = {
+    require(constant.literal, "Literal expected")
+
+    val fieldTerm = newName("constant")
+    val nullTerm = fieldTerm + "isNull"
+
+    val fieldType = primitiveTypeTermForType(constant.resultType)
+
+    val field =
+      s"""
+         |private final $fieldType $fieldTerm;
+         |private final boolean $nullTerm;
+         |""".stripMargin
+    reusableMemberStatements.add(field)
+
+    val init =
+      s"""
+         |${constant.code}
+         |$fieldTerm = ${constant.resultTerm};
+         |$nullTerm = ${constant.nullTerm};
+         |""".stripMargin
+    reusableInitStatements.add(init)
+
+    GeneratedExpression(fieldTerm, nullTerm, "", constant.resultType)
+  }
+
+
+  /**
+    * Adds a reusable string constant to the member area of the generated 
class.
+    */
+  def addReusableStringConstants(value: String): String = {
+    reusableStringConstants.get(value) match {
+      case Some(field) => field
+      case None =>
+        val field = newName("str")
+        val stmt =
+          s"""
+             |private final $BINARY_STRING $field = 
$BINARY_STRING.fromString("$value");"
+           """.stripMargin
+        reusableMemberStatements.add(stmt)
+        reusableStringConstants(value) = field
+        field
+    }
+  }
+
+  /**
+    * Adds a reusable MessageDigest to the member area of the generated 
[[Function]].
+    *
+    * @return member variable term
+    */
+  def addReusableMessageDigest(algorithm: String): String = {
+    val fieldTerm = newName("messageDigest")
+
+    val field = s"final java.security.MessageDigest $fieldTerm;"
+    reusableMemberStatements.add(field)
+
+    val fieldInit =
+      s"""
+         |try {
+         |  $fieldTerm = java.security.MessageDigest.getInstance("$algorithm");
+         |} catch (java.security.NoSuchAlgorithmException e) {
+         |  throw new RuntimeException("Algorithm for '$algorithm' is not 
available.", e);
+         |}
+         |""".stripMargin
+    reusableInitStatements.add(fieldInit)
+
+    fieldTerm
+  }
+
+  /**
+    * Adds a constant SHA2 reusable MessageDigest to the member area of the 
generated [[Function]].
+    *
+    * @return member variable term
+    */
+  def addReusableSha2MessageDigest(constant: GeneratedExpression, nullCheck: 
Boolean): String = {
+    require(constant.literal, "Literal expected")
+    val fieldTerm = newName("messageDigest")
+
+    val field =
+      s"final java.security.MessageDigest $fieldTerm;"
+    reusableMemberStatements.add(field)
+
+    val bitLen = constant.resultTerm
+    val init =
+      s"""
+         |if ($bitLen == 224 || $bitLen == 256 || $bitLen == 384 || $bitLen == 
512) {
+         |  try {
+         |    $fieldTerm = java.security.MessageDigest.getInstance("SHA-" + 
$bitLen);
+         |  } catch (java.security.NoSuchAlgorithmException e) {
+         |    throw new RuntimeException(
+         |      "Algorithm for 'SHA-" + $bitLen + "' is not available.", e);
+         |  }
+         |} else {
+         |  throw new RuntimeException("Unsupported algorithm.");
+         |}
+         |""".stripMargin
+    val nullableInit = if (nullCheck) {
+      s"""
+         |${constant.code}
+         |if (${constant.nullTerm}) {
+         |  $fieldTerm = null;
+         |} else {
+         |  $init
+         |}
+         |""".stripMargin
+    } else {
+      s"""
+         |${constant.code}
+         |$init
+         |""".stripMargin
+    }
+    reusableInitStatements.add(nullableInit)
+
+    fieldTerm
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala
new file mode 100644
index 0000000..35a3999
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.`type`.InternalType
+
+/**
+  * Describes a generated expression.
+  *
+  * @param resultTerm term to access the result of the expression
+  * @param nullTerm boolean term that indicates if expression is null
+  * @param code code necessary to produce resultTerm and nullTerm
+  * @param resultType type of the resultTerm
+  * @param literal flag to indicate a constant expression do not reference 
input and can thus
+  *                 be used in the member area (e.g. as constructor parameter 
of a reusable
+  *                 instance)
+  */
+case class GeneratedExpression(
+  resultTerm: String,
+  nullTerm: String,
+  code: String,
+  resultType: InternalType,
+  literal: Boolean = false)
+
+object GeneratedExpression {
+  val ALWAYS_NULL = "true"
+  val NEVER_NULL = "false"
+  val NO_CODE = ""
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
new file mode 100644
index 0000000..bff8a62
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+class IndentStringContext(sc: StringContext) {
+  def j(args: Any*): String = {
+    val sb = new StringBuilder()
+    for ((s, a) <- sc.parts zip args) {
+      sb append s
+
+      val ind = getindent(s)
+      if (ind.nonEmpty) {
+        sb append a.toString.replaceAll("\n", "\n" + ind)
+      } else {
+        sb append a.toString
+      }
+    }
+    if (sc.parts.size > args.size) {
+      sb append sc.parts.last
+    }
+
+    sb.toString()
+  }
+
+  // get white indent after the last new line, if any
+  def getindent(str: String): String = {
+    val lastnl = str.lastIndexOf("\n")
+    if (lastnl == -1) ""
+    else {
+      val ind = str.substring(lastnl + 1)
+      val trimmed = ind.trim
+      if (trimmed.isEmpty || trimmed == "|") {
+        ind // ind is all whitespace or pipe for use with stripMargin. Use this
+      } else {
+        ""
+      }
+    }
+  }
+}
+
+object Indenter {
+  implicit def toISC(sc: StringContext): IndentStringContext = new 
IndentStringContext(sc)
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
new file mode 100644
index 0000000..41676a8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.table.`type`._
+
+object TypeCheckUtils {
+
+  def isNumeric(dataType: InternalType): Boolean = dataType match {
+    case InternalTypes.INT | InternalTypes.BYTE | InternalTypes.SHORT
+         | InternalTypes.LONG | InternalTypes.FLOAT | InternalTypes.DOUBLE => 
true
+    case _: DecimalType => true
+    case _ => false
+  }
+
+  def isString(dataType: InternalType): Boolean = dataType == 
InternalTypes.STRING
+
+  def isBinary(dataType: InternalType): Boolean = dataType == 
InternalTypes.BINARY
+
+  def isBoolean(dataType: InternalType): Boolean = dataType == 
InternalTypes.BOOLEAN
+
+  def isDecimal(dataType: InternalType): Boolean = 
dataType.isInstanceOf[DecimalType]
+
+  def isInteger(dataType: InternalType): Boolean = dataType == 
InternalTypes.INT
+
+  def isLong(dataType: InternalType): Boolean = dataType == InternalTypes.LONG
+
+  def isArray(dataType: InternalType): Boolean = 
dataType.isInstanceOf[ArrayType]
+
+  def isMap(dataType: InternalType): Boolean = dataType.isInstanceOf[MapType]
+
+  def isComparable(dataType: InternalType): Boolean =
+    !dataType.isInstanceOf[GenericType[_]] &&
+      !dataType.isInstanceOf[MapType] &&
+      !dataType.isInstanceOf[RowType] &&
+      !isArray(dataType)
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
new file mode 100644
index 0000000..c81d2cc
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities to compile a generated code to a Class.
+ */
+public final class CompileUtils {
+
+       // used for logging the generated codes to a same place
+       private static final Logger CODE_LOG = 
LoggerFactory.getLogger(CompileUtils.class);
+
+       /**
+        * Cache of compile, Janino generates a new Class Loader and a new 
Class file every compile
+        * (guaranteeing that the class name will not be repeated). This leads 
to multiple tasks of
+        * the same process that generate a large number of duplicate class, 
resulting in a large
+        * number of Meta zone GC (class unloading), resulting in performance 
bottlenecks. So we add
+        * a cache to avoid this problem.
+        */
+       protected static final Cache<Tuple2<ClassLoader, String>, Class<?>> 
COMPILED_CACHE = CacheBuilder
+               .newBuilder()
+               .maximumSize(100)   // estimated cache size
+               .build();
+
+       /**
+        * Compiles a generated code to a Class.
+        * @param cl the ClassLoader used to load the class
+        * @param name  the class name
+        * @param code  the generated code
+        * @param <T>   the class type
+        * @return  the compiled class
+        */
+       public static <T> Class<T> compile(ClassLoader cl, String name, String 
code) {
+               Tuple2<ClassLoader, String> cacheKey = Tuple2.of(cl, name);
+               Class<?> clazz = COMPILED_CACHE.getIfPresent(cacheKey);
+               if (clazz == null) {
+                       clazz = doCompile(cl, name, code);
+                       COMPILED_CACHE.put(cacheKey, clazz);
+               }
+               //noinspection unchecked
+               return (Class<T>) clazz;
+       }
+
+       private static <T> Class<T> doCompile(ClassLoader cl, String name, 
String code) {
+               checkNotNull(cl, "Classloader must not be null.");
+               CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code);
+               SimpleCompiler compiler = new SimpleCompiler();
+               compiler.setParentClassLoader(cl);
+               try {
+                       compiler.cook(code);
+               } catch (Throwable t) {
+                       throw new InvalidProgramException(
+                               "Table program cannot be compiled. This is a 
bug. Please file an issue.", t);
+               }
+               try {
+                       //noinspection unchecked
+                       return (Class<T>) 
compiler.getClassLoader().loadClass(name);
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Can not load class " + 
name, e);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
new file mode 100644
index 0000000..8f26e14
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} 
method
+ * to get an instance by reference objects easily.
+ */
+public abstract class GeneratedClass<T> implements Serializable {
+
+       private final String className;
+       private final String code;
+       private final Object[] references;
+
+       private transient Class<T> compiledClass;
+
+       protected GeneratedClass(String className, String code, Object[] 
references) {
+               checkNotNull(className, "name must not be null");
+               checkNotNull(code, "code must not be null");
+               checkNotNull(references, "references must not be null");
+               this.className = className;
+               this.code = code;
+               this.references = references;
+       }
+
+       /**
+        * Create a new instance of this generated class.
+        */
+       @SuppressWarnings("unchecked")
+       public T newInstance(ClassLoader classLoader) {
+               try {
+                       return (T) 
compile(classLoader).getConstructor(Object[].class).newInstance(references);
+               } catch (Exception e) {
+                       throw new RuntimeException(
+                               "Could not instantiate generated class '" + 
className + "'", e);
+               }
+       }
+
+       private Class<?> compile(ClassLoader classLoader) {
+               if (compiledClass == null) {
+                       // cache the compiled class
+                       compiledClass = CompileUtils.compile(classLoader, 
className, code);
+               }
+               return compiledClass;
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java
new file mode 100644
index 0000000..ce082ba
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Describes a generated {@link Collector}.
+ *
+ * @param <C> type of collector
+ */
+public final class GeneratedCollector<C extends Collector<?>> extends 
GeneratedClass<C> {
+
+       private static final long serialVersionUID = -7355875544905245676L;
+
+       /**
+        * Creates a GeneratedCollector.
+        *
+        * @param className class name of the generated Collector.
+        * @param code code of the generated Collector.
+        * @param references referenced objects of the generated Collector.
+        */
+       public GeneratedCollector(String className, String code, Object[] 
references) {
+               super(className, code, references);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java
new file mode 100644
index 0000000..e43da42
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Describes a generated {@link Function}.
+ *
+ * @param <F> type of Function
+ */
+public final class GeneratedFunction<F extends Function> extends 
GeneratedClass<F> {
+
+       private static final long serialVersionUID = -7355875544905245676L;
+
+       /**
+        * Creates a GeneratedFunction.
+        *
+        * @param className class name of the generated Function.
+        * @param code code of the generated Function.
+        * @param references referenced objects of the generated Function.
+        */
+       public GeneratedFunction(String className, String code, Object[] 
references) {
+               super(className, code, references);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java
new file mode 100644
index 0000000..9331bb3
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.api.common.io.InputFormat;
+
+/**
+ * Describes a generated {@link InputFormat}.
+ *
+ * @param <F> type of Function
+ */
+public final class GeneratedInput<F extends InputFormat<?, ?>> extends 
GeneratedClass<F> {
+
+       private static final long serialVersionUID = -7355875544905245676L;
+
+       /**
+        * Creates a GeneratedInput.
+        *
+        * @param className class name of the generated Function.
+        * @param code code of the generated Function.
+        * @param references referenced objects of the generated Function.
+        */
+       public GeneratedInput(String className, String code, Object[] 
references) {
+               super(className, code, references);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java
new file mode 100644
index 0000000..2e67713
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.generated;
+
+import org.apache.flink.api.common.InvalidProgramException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for {@link CompileUtils}.
+ */
+public class CompileUtilsTest {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Before
+       public void before() {
+               // cleanup cached class before tests
+               CompileUtils.COMPILED_CACHE.invalidateAll();
+       }
+
+       @Test
+       public void testCacheReuse() {
+               String code =
+                       "public class Main {\n" +
+                       "  int i;\n" +
+                       "  int j;\n" +
+                       "}";
+
+               Class<?> class1 = 
CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
+               Class<?> class2 = 
CompileUtils.compile(this.getClass().getClassLoader(), "Main", code);
+               Class<?> class3 = CompileUtils.compile(new TestClassLoader(), 
"Main", code);
+               assertSame(class1, class2);
+               assertNotSame(class1, class3);
+       }
+
+       @Test
+       public void testWrongCode() {
+               String code =
+                       "public class111 Main {\n" +
+                       "  int i;\n" +
+                       "  int j;\n" +
+                       "}";
+
+               thrown.expect(InvalidProgramException.class);
+               thrown.expectMessage("Table program cannot be compiled. This is 
a bug. Please file an issue.");
+               CompileUtils.compile(this.getClass().getClassLoader(), "Main", 
code);
+       }
+
+       private static class TestClassLoader extends URLClassLoader {
+
+               TestClassLoader() {
+                       super(new URL[0], 
Thread.currentThread().getContextClassLoader());
+               }
+       }
+}

Reply via email to