This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ec3b36c [FLINK-11830][table-planner-blink] Introduce CodeGeneratorContext to maintain reusable statements ec3b36c is described below commit ec3b36c4f0941ded4cb8768897269d9769f80532 Author: Jark Wu <imj...@gmail.com> AuthorDate: Wed Mar 6 11:17:12 2019 +0800 [FLINK-11830][table-planner-blink] Introduce CodeGeneratorContext to maintain reusable statements This closes #7906 --- .../org/apache/flink/table/api/TableConfig.scala | 111 ++++ .../flink/table/codegen/CodeGenException.scala | 24 + .../apache/flink/table/codegen/CodeGenUtils.scala | 232 +++++++ .../flink/table/codegen/CodeGeneratorContext.scala | 688 +++++++++++++++++++++ .../flink/table/codegen/GeneratedExpression.scala | 45 ++ .../org/apache/flink/table/codegen/Indenter.scala | 58 ++ .../flink/table/typeutils/TypeCheckUtils.scala | 54 ++ .../apache/flink/table/generated/CompileUtils.java | 90 +++ .../flink/table/generated/GeneratedClass.java | 66 ++ .../flink/table/generated/GeneratedCollector.java | 42 ++ .../flink/table/generated/GeneratedFunction.java | 42 ++ .../flink/table/generated/GeneratedInput.java | 42 ++ .../flink/table/generated/CompileUtilsTest.java | 82 +++ 13 files changed, 1576 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala new file mode 100644 index 0000000..2e8bc8a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api + +import _root_.java.util.TimeZone +import _root_.java.math.MathContext + +/** + * A config to define the runtime behavior of the Table API. + */ +class TableConfig { + + /** + * Defines the timezone for date/time/timestamp conversions. + */ + private var timeZone: TimeZone = TimeZone.getTimeZone("UTC") + + /** + * Defines if all fields need to be checked for NULL first. + */ + private var nullCheck: Boolean = true + + /** + * Defines the default context for decimal division calculation. + * We use Scala's default MathContext.DECIMAL128. + */ + private var decimalContext: MathContext = MathContext.DECIMAL128 + + /** + * Specifies a threshold where generated code will be split into sub-function calls. Java has a + * maximum method length of 64 KB. This setting allows for finer granularity if necessary. + */ + private var maxGeneratedCodeLength: Int = 64000 // just an estimate + + /** + * Sets the timezone for date/time/timestamp conversions. + */ + def setTimeZone(timeZone: TimeZone): Unit = { + require(timeZone != null, "timeZone must not be null.") + this.timeZone = timeZone + } + + /** + * Returns the timezone for date/time/timestamp conversions. + */ + def getTimeZone: TimeZone = timeZone + + /** + * Returns the NULL check. If enabled, all fields need to be checked for NULL first. + */ + def getNullCheck: Boolean = nullCheck + + /** + * Sets the NULL check. If enabled, all fields need to be checked for NULL first. + */ + def setNullCheck(nullCheck: Boolean): Unit = { + this.nullCheck = nullCheck + } + + /** + * Returns the default context for decimal division calculation. + * [[_root_.java.math.MathContext#DECIMAL128]] by default. + */ + def getDecimalContext: MathContext = decimalContext + + /** + * Sets the default context for decimal division calculation. + * [[_root_.java.math.MathContext#DECIMAL128]] by default. + */ + def setDecimalContext(mathContext: MathContext): Unit = { + this.decimalContext = mathContext + } + + /** + * Returns the current threshold where generated code will be split into sub-function calls. + * Java has a maximum method length of 64 KB. This setting allows for finer granularity if + * necessary. Default is 64000. + */ + def getMaxGeneratedCodeLength: Int = maxGeneratedCodeLength + + /** + * Returns the current threshold where generated code will be split into sub-function calls. + * Java has a maximum method length of 64 KB. This setting allows for finer granularity if + * necessary. Default is 64000. + */ + def setMaxGeneratedCodeLength(maxGeneratedCodeLength: Int): Unit = { + if (maxGeneratedCodeLength <= 0) { + throw new IllegalArgumentException("Length must be greater than 0.") + } + this.maxGeneratedCodeLength = maxGeneratedCodeLength + } +} + +object TableConfig { + def DEFAULT = new TableConfig() +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala new file mode 100644 index 0000000..1f2e9a9 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +/** + * Exception for all errors occurring during code generation. + */ +class CodeGenException(msg: String) extends RuntimeException(msg) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala new file mode 100644 index 0000000..266f9fe --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.`type`._ +import org.apache.flink.table.dataformat._ +import org.apache.flink.table.typeutils.TypeCheckUtils + +import java.lang.reflect.Method +import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort} +import java.util.concurrent.atomic.AtomicInteger + +object CodeGenUtils { + + // ------------------------------- DEFAULT TERMS ------------------------------------------ + + val DEFAULT_TIMEZONE_TERM = "timeZone" + + // -------------------------- CANONICAL CLASS NAMES --------------------------------------- + + val BINARY_ROW: String = className[BinaryRow] + val BINARY_STRING: String = className[BinaryString] + val BASE_ROW: String = className[BaseRow] + val GENERIC_ROW: String = className[GenericRow] + + // ---------------------------------------------------------------------------------------- + + private val nameCounter = new AtomicInteger + + def newName(name: String): String = { + s"$name$$${nameCounter.getAndIncrement}" + } + + def newNames(names: String*): Seq[String] = { + require(names.toSet.size == names.length, "Duplicated names") + val newId = nameCounter.getAndIncrement + names.map(name => s"$name$$$newId") + } + + /** + * Retrieve the canonical name of a class type. + */ + def className[T](implicit m: Manifest[T]): String = m.runtimeClass.getCanonicalName + + def needCopyForType(t: InternalType): Boolean = t match { + case InternalTypes.STRING => true + case _: ArrayType => true + case _: MapType => true + case _: RowType => true + case _: GenericType[_] => true + case _ => false + } + + // when casting we first need to unbox Primitives, for example, + // float a = 1.0f; + // byte b = (byte) a; + // works, but for boxed types we need this: + // Float a = 1.0f; + // Byte b = (byte)(float) a; + def primitiveTypeTermForType(t: InternalType): String = t match { + case InternalTypes.INT => "int" + case InternalTypes.LONG => "long" + case InternalTypes.SHORT => "short" + case InternalTypes.BYTE => "byte" + case InternalTypes.FLOAT => "float" + case InternalTypes.DOUBLE => "double" + case InternalTypes.BOOLEAN => "boolean" + case InternalTypes.CHAR => "char" + + case InternalTypes.DATE => "int" + case InternalTypes.TIME => "int" + case InternalTypes.TIMESTAMP => "long" + + // TODO: support [INTERVAL_MONTHS] and [INTERVAL_MILLIS] in the future + + case _ => boxedTypeTermForType(t) + } + + def boxedTypeTermForType(t: InternalType): String = t match { + case InternalTypes.INT => className[JInt] + case InternalTypes.LONG => className[JLong] + case InternalTypes.SHORT => className[JShort] + case InternalTypes.BYTE => className[JByte] + case InternalTypes.FLOAT => className[JFloat] + case InternalTypes.DOUBLE => className[JDouble] + case InternalTypes.BOOLEAN => className[JBoolean] + case InternalTypes.CHAR => className[JChar] + + case InternalTypes.DATE => boxedTypeTermForType(InternalTypes.INT) + case InternalTypes.TIME => boxedTypeTermForType(InternalTypes.INT) + case InternalTypes.TIMESTAMP => boxedTypeTermForType(InternalTypes.LONG) + + case InternalTypes.STRING => BINARY_STRING + + // TODO: Support it when we introduce [Decimal] + case _: DecimalType => throw new UnsupportedOperationException + // BINARY is also an ArrayType and uses BinaryArray internally too + case _: ArrayType => className[BinaryArray] + case _: MapType => className[BinaryMap] + case _: RowType => className[BaseRow] + + case gt: GenericType[_] => gt.getTypeInfo.getTypeClass.getCanonicalName + } + + /** + * Gets the boxed type term from external type info. + * We only use TypeInformation to store external type info. + */ + def boxedTypeTermForExternalType(t: TypeInformation[_]): String = t.getTypeClass.getCanonicalName + + /** + * Gets the default value for a primitive type, and null for generic types + */ + def primitiveDefaultValue(t: InternalType): String = t match { + case InternalTypes.INT | InternalTypes.BYTE | InternalTypes.SHORT => "-1" + case InternalTypes.LONG => "-1L" + case InternalTypes.FLOAT => "-1.0f" + case InternalTypes.DOUBLE => "-1.0d" + case InternalTypes.BOOLEAN => "false" + case InternalTypes.STRING => s"$BINARY_STRING.EMPTY_UTF8" + case InternalTypes.CHAR => "'\\0'" + + case InternalTypes.DATE | InternalTypes.TIME => "-1" + case InternalTypes.TIMESTAMP => "-1L" + + case _ => "null" + } + + // -------------------------- Method & Enum --------------------------------------- + + def qualifyMethod(method: Method): String = + method.getDeclaringClass.getCanonicalName + "." + method.getName + + def qualifyEnum(enum: Enum[_]): String = + enum.getClass.getCanonicalName + "." + enum.name() + + def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum) + + def getEnum(genExpr: GeneratedExpression): Enum[_] = { + val split = genExpr.resultTerm.split('.') + val value = split.last + val clazz = genExpr.resultType.asInstanceOf[GenericType[_]].getTypeClass + enumValueOf(clazz, value) + } + + def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] = + Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]] + + // --------------------------- Require Check --------------------------------------- + + def requireNumeric(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isNumeric(genExpr.resultType)) { + throw new CodeGenException("Numeric expression type expected, but was " + + s"'${genExpr.resultType}'.") + } + + def requireComparable(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isComparable(genExpr.resultType)) { + throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.") + } + + def requireString(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isString(genExpr.resultType)) { + throw new CodeGenException("String expression type expected.") + } + + def requireBoolean(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isBoolean(genExpr.resultType)) { + throw new CodeGenException("Boolean expression type expected.") + } + + def requireArray(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isArray(genExpr.resultType)) { + throw new CodeGenException("Array expression type expected.") + } + + def requireMap(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isMap(genExpr.resultType)) { + throw new CodeGenException("Map expression type expected.") + } + + def requireInteger(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isInteger(genExpr.resultType)) { + throw new CodeGenException("Integer expression type expected.") + } + + // --------------------------- Generate Utils --------------------------------------- + + def generateOutputRecordStatement( + t: InternalType, + clazz: Class[_], + outRecordTerm: String, + outRecordWriterTerm: Option[String] = None): String = { + t match { + case rt: RowType if clazz == classOf[BinaryRow] => + val writerTerm = outRecordWriterTerm.getOrElse( + throw new CodeGenException("No writer is specified when writing BinaryRow record.") + ) + val binaryRowWriter = className[BinaryRowWriter] + val typeTerm = clazz.getCanonicalName + s""" + |final $typeTerm $outRecordTerm = new $typeTerm(${rt.getArity}); + |final $binaryRowWriter $writerTerm = new $binaryRowWriter($outRecordTerm); + |""".stripMargin.trim + case rt: RowType if classOf[ObjectArrayRow].isAssignableFrom(clazz) => + val typeTerm = clazz.getCanonicalName + s"final $typeTerm $outRecordTerm = new $typeTerm(${rt.getArity});" + // TODO: support [JoinedRow] in the future + case _ => + val typeTerm = boxedTypeTermForType(t) + s"final $typeTerm $outRecordTerm = new $typeTerm();" + } + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala new file mode 100644 index 0000000..98f1eb8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import org.apache.calcite.avatica.util.DateTimeUtils +import org.apache.flink.api.common.functions.{Function, RuntimeContext} +import org.apache.flink.table.`type`.{InternalType, InternalTypes, RowType} +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils._ +import org.apache.flink.table.dataformat.GenericRow +import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} +import org.apache.flink.util.InstantiationUtil + +import scala.collection.mutable + +/** + * The context for code generator, maintaining various reusable statements that could be insert + * into different code sections in the final generated class. + */ +class CodeGeneratorContext(val tableConfig: TableConfig) { + + // holding a list of objects that could be used passed into generated class + val references: mutable.ArrayBuffer[AnyRef] = new mutable.ArrayBuffer[AnyRef]() + + // set of member statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableMemberStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableInitStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of open statements for RichFunction that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableOpenStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of close statements for RichFunction that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCloseStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of endInput statements for StreamOperator that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableEndInputStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of statements for cleanup dataview that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCleanupStatements = mutable.LinkedHashSet[String]() + + // set of statements that will be added only once per record; + // code should only update member variables because local variables are not accessible if + // the code needs to be split; + // we use a LinkedHashSet to keep the insertion order + private val reusablePerRecordStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = + mutable.Map[(String, Int), GeneratedExpression]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = + mutable.LinkedHashSet[(String, String)]() + + // set of inner class definition statements that will be added only once + private val reusableInnerClassDefinitionStatements: mutable.Map[String, String] = + mutable.Map[String, String]() + + // map of string constants that will be added only once + // string_constant -> reused_term + private val reusableStringConstants: mutable.Map[String, String] = mutable.Map[String, String]() + + // map of local variable statements. It will be placed in method if method code not excess + // max code length, otherwise will be placed in member area of the class. The statements + // are maintained for multiple methods, so that it's a map from method_name to variables. + // + // method_name -> local_variable_statements + private val reusableLocalVariableStatements = mutable.Map[String, mutable.LinkedHashSet[String]]() + + /** + * The current method name for [[reusableLocalVariableStatements]]. You can start a new + * local variable statements for another method using [[startNewLocalVariableStatement()]] + */ + private var currentMethodNameForLocalVariables = "DEFAULT" + + // --------------------------------------------------------------------------------- + // Getter + // --------------------------------------------------------------------------------- + + def getReusableInputUnboxingExprs(inputTerm: String, index: Int): Option[GeneratedExpression] = + reusableInputUnboxingExprs.get((inputTerm, index)) + + def getNullCheck: Boolean = tableConfig.getNullCheck + + // --------------------------------------------------------------------------------- + // Local Variables for Code Split + // --------------------------------------------------------------------------------- + + /** + * Starts a new local variable statements for a generated class with the given method name. + * @param methodName the method name which the fields will be placed into if code is not split. + */ + def startNewLocalVariableStatement(methodName: String): Unit = { + currentMethodNameForLocalVariables = methodName + reusableLocalVariableStatements(methodName) = mutable.LinkedHashSet[String]() + } + + + /** + * Adds a reusable local variable statement with the given type term and field name. + * The local variable statements will be placed in methods or class member area depends + * on whether the method length excess max code length. + * + * @param fieldName the field name prefix + * @param fieldTypeTerm the field type term + * @return a new generated unique field name + */ + def newReusableLocalVariable(fieldTypeTerm: String, fieldName: String): String = { + val fieldTerm = newName(fieldName) + reusableLocalVariableStatements + .getOrElse(currentMethodNameForLocalVariables, mutable.LinkedHashSet[String]()) + .add(s"$fieldTypeTerm $fieldTerm;") + fieldTerm + } + + /** + * Adds multiple pairs of local variables. + * The local variable statements will be placed in methods or class + * member area depends on whether the method length excess max code length. + * + * @param fieldTypeAndNames pairs of local variables with + * left is field type term and right is field name + * @return the new generated unique field names for each variable pairs + */ + def newReusableLocalFields(fieldTypeAndNames: (String, String)*): Seq[String] = { + val fieldTerms = newNames(fieldTypeAndNames.map(_._2): _*) + fieldTypeAndNames.map(_._1).zip(fieldTerms).foreach { case (fieldTypeTerm, fieldTerm) => + reusableLocalVariableStatements + .getOrElse(currentMethodNameForLocalVariables, mutable.LinkedHashSet[String]()) + .add(s"$fieldTypeTerm $fieldTerm;") + } + fieldTerms + } + + // --------------------------------------------------------------------------------- + // generate reuse code methods + // --------------------------------------------------------------------------------- + + /** + * @return code block of statements that need to be placed in the member area of the class + * (e.g. inner class definition) + */ + def reuseInnerClassDefinitionCode(): String = { + reusableInnerClassDefinitionStatements.values.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the member area of the class + * (e.g. member variables and their initialization) + */ + def reuseMemberCode(): String = { + reusableMemberStatements.mkString("\n") + } + + /** + * @return code block of statements that will be placed in the member area of the class + * if generated code is split or in local variables of method + */ + def reuseLocalVariableCode(methodName: String = null): String = { + if (methodName == null) { + reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n") + } else { + reusableLocalVariableStatements(methodName).mkString("\n") + } + } + + /** + * @return code block of statements that need to be placed in the constructor + */ + def reuseInitCode(): String = { + reusableInitStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the per recode process block + * (e.g. Function or StreamOperator's processElement) + */ + def reusePerRecordCode(): String = { + reusablePerRecordStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the open() method + * (e.g. RichFunction or StreamOperator) + */ + def reuseOpenCode(): String = { + reusableOpenStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the close() method + * (e.g. RichFunction or StreamOperator) + */ + def reuseCloseCode(): String = { + reusableCloseStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the endInput() method + * (StreamOperator) + */ + def reuseEndInputCode(): String = { + reusableEndInputStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the cleanup() method of + * [AggregationsFunction] + */ + def reuseCleanupCode(): String = { + reusableCleanupStatements.mkString("", "\n", "\n") + } + + /** + * @return code block of statements that unbox input variables to a primitive variable + * and a corresponding null flag variable + */ + def reuseInputUnboxingCode(): String = { + reusableInputUnboxingExprs.values.map(_.code).mkString("\n") + } + + /** + * Returns code block of unboxing input variables which belongs to the given inputTerm. + */ + def reuseInputUnboxingCode(inputTerm: String): String = { + val exprs = reusableInputUnboxingExprs.filter { case ((term, _), _) => + inputTerm.equals(term) + } + val codes = for (((_, _), expr) <- exprs) yield expr.code + codes.mkString("\n").trim + } + + /** + * @return code block of constructor statements + */ + def reuseConstructorCode(className: String): String = { + reusableConstructorStatements.map { case (params, body) => + s""" + |public $className($params) throws Exception { + | this(); + | $body + |} + |""".stripMargin + }.mkString("\n") + } + + // --------------------------------------------------------------------------------- + // add reusable code blocks + // --------------------------------------------------------------------------------- + + /** + * Adds a reusable inner class statement with the given class name and class code + */ + def addReusableInnerClass(className: String, statements: String): Unit = { + reusableInnerClassDefinitionStatements(className) = statements + } + + /** + * Adds a reusable member field statement to the member area. + * + * @param memberStatement the member field declare statement + */ + def addReusableMember(memberStatement: String): Unit = { + reusableMemberStatements.add(memberStatement) + } + + /** + * Adds a reusable per record statement + */ + def addReusablePerRecordStatement(s: String): Unit = reusablePerRecordStatements.add(s) + + /** + * Adds a reusable open statement + */ + def addReusableOpenStatement(s: String): Unit = reusableOpenStatements.add(s) + + /** + * Adds a reusable close statement + */ + def addReusableCloseStatement(s: String): Unit = reusableCloseStatements.add(s) + + /** + * Adds a reusable endInput statement + */ + def addReusableEndInputStatement(s: String): Unit = reusableEndInputStatements.add(s) + + /** + * Adds a reusable cleanup statement + */ + def addReusableCleanupStatement(s: String): Unit = reusableCleanupStatements.add(s) + + + /** + * Adds a reusable input unboxing expression + */ + def addReusableInputUnboxingExprs( + inputTerm: String, + index: Int, + expr: GeneratedExpression): Unit = reusableInputUnboxingExprs((inputTerm, index)) = expr + + /** + * Adds a reusable output record statement to member area. + */ + def addReusableOutputRecord( + t: InternalType, + clazz: Class[_], + outRecordTerm: String, + outRecordWriterTerm: Option[String] = None): Unit = { + val statement = generateOutputRecordStatement(t, clazz, outRecordTerm, outRecordWriterTerm) + reusableMemberStatements.add(statement) + } + + /** + * Adds a reusable null [[org.apache.flink.table.dataformat.GenericRow]] to the member area. + */ + def addReusableNullRow(rowTerm: String, arity: Int): Unit = { + addReusableOutputRecord( + new RowType((0 until arity).map(_ => InternalTypes.INT): _*), + classOf[GenericRow], + rowTerm) + } + + /** + * Adds a reusable timestamp to the beginning of the SAM of the generated class. + */ + def addReusableTimestamp(): String = { + val fieldTerm = s"timestamp" + val field = + s""" + |final long $fieldTerm = java.lang.System.currentTimeMillis(); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local timestamp to the beginning of the SAM of the generated class. + */ + def addReusableLocalTimestamp(): String = { + addReusableTimestamp() + } + + /** + * Adds a reusable time to the beginning of the SAM of the generated class. + */ + def addReusableTime(): String = { + val fieldTerm = s"time" + val timestamp = addReusableTimestamp() + // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime() + val field = + s""" + |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |if (time < 0) { + | time += ${DateTimeUtils.MILLIS_PER_DAY}; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local time to the beginning of the SAM of the generated class. + */ + def addReusableLocalTime(): String = { + val fieldTerm = s"localtime" + val timeZone = addReusableTimeZone() + val localtimestamp = addReusableLocalTimestamp() + // adopted from org.apache.calcite.runtime.SqlFunctions.localTime() + val field = + s""" + |final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp)) + | % ${DateTimeUtils.MILLIS_PER_DAY}); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable date to the beginning of the SAM of the generated class. + */ + def addReusableDate(): String = { + val fieldTerm = s"date" + val timestamp = addReusableTimestamp() + val time = addReusableTime() + val timeZone = addReusableTimeZone() + + // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate() + val field = + s""" + |final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp)) + | / ${DateTimeUtils.MILLIS_PER_DAY}); + |if ($time < 0) { + | $fieldTerm -= 1; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable TimeZone to the member area of the generated class. + */ + def addReusableTimeZone(): String = { + val zoneID = tableConfig.getTimeZone.getID + val stmt = + s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM = + | java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin + addReusableMember(stmt) + DEFAULT_TIMEZONE_TERM + } + + + /** + * Adds a reusable [[java.util.Random]] to the member area of the generated class. + * + * The seed parameter must be a literal/constant expression. + * + * @return member variable term + */ + def addReusableRandom(seedExpr: Option[GeneratedExpression]): String = { + val fieldTerm = newName("random") + + val field = + s""" + |final java.util.Random $fieldTerm; + |""".stripMargin + + val fieldInit = seedExpr match { + case Some(s) if getNullCheck => + s""" + |${s.code} + |if (!${s.nullTerm}) { + | $fieldTerm = new java.util.Random(${s.resultTerm}); + |} + |else { + | $fieldTerm = new java.util.Random(); + |} + |""".stripMargin + case Some(s) => + s""" + |${s.code} + |$fieldTerm = new java.util.Random(${s.resultTerm}); + |""".stripMargin + case _ => + s""" + |$fieldTerm = new java.util.Random(); + |""".stripMargin + } + + reusableMemberStatements.add(field) + reusableInitStatements.add(fieldInit) + fieldTerm + } + + /** + * Adds a reusable Object to the member area of the generated class + * @param obj the object to be added to the generated class + * @param fieldNamePrefix prefix field name of the generated member field term + * @param fieldTypeTerm field type class name + * @return the generated unique field term + */ + def addReusableObject( + obj: AnyRef, + fieldNamePrefix: String, + fieldTypeTerm: String = null): String = { + val fieldTerm = newName(fieldNamePrefix) + val clsName = Option(fieldTypeTerm).getOrElse(obj.getClass.getCanonicalName) + addReusableObjectInternal(obj, fieldTerm, clsName) + fieldTerm + } + + private def addReusableObjectInternal( + obj: AnyRef, + fieldTerm: String, + fieldTypeTerm: String): Unit = { + val idx = references.length + // make a deep copy of the object + val byteArray = InstantiationUtil.serializeObject(obj) + val objCopy: AnyRef = InstantiationUtil.deserializeObject( + byteArray, + obj.getClass.getClassLoader) + references += objCopy + + reusableMemberStatements.add(s"private transient $fieldTypeTerm $fieldTerm;") + reusableInitStatements.add(s"$fieldTerm = ((($fieldTypeTerm) references[$idx]));") + } + + /** + * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]]. + * + * @param function [[UserDefinedFunction]] object to be instantiated during runtime + * @param contextTerm [[RuntimeContext]] term to access the [[RuntimeContext]] + * @return member variable term + */ + def addReusableFunction(function: UserDefinedFunction, contextTerm: String = null): String = { + val classQualifier = function.getClass.getCanonicalName + val fieldTerm = s"function_${function.functionIdentifier}" + + addReusableObjectInternal(function, fieldTerm, classQualifier) + + val openFunction = if (contextTerm != null) { + s""" + |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}($contextTerm)); + """.stripMargin + } else { + s""" + |$fieldTerm.open(new ${classOf[FunctionContext].getCanonicalName}(getRuntimeContext())); + """.stripMargin + } + reusableOpenStatements.add(openFunction) + + val closeFunction = + s""" + |$fieldTerm.close(); + """.stripMargin + reusableCloseStatements.add(closeFunction) + + fieldTerm + } + + /** + * Adds a reusable static SLF4J Logger to the member area of the generated class. + */ + def addReusableLogger(logTerm: String, clazzTerm: String): Unit = { + val stmt = + s""" + |private static final org.slf4j.Logger $logTerm = + | org.slf4j.LoggerFactory.getLogger("$clazzTerm"); + |""".stripMargin + reusableMemberStatements.add(stmt) + } + + /** + * Adds a reusable constant to the member area of the generated class. + * + * @param constant constant expression + * @return generated expression with the fieldTerm and nullTerm + */ + def addReusableConstant( + constant: GeneratedExpression, + nullCheck: Boolean): GeneratedExpression = { + require(constant.literal, "Literal expected") + + val fieldTerm = newName("constant") + val nullTerm = fieldTerm + "isNull" + + val fieldType = primitiveTypeTermForType(constant.resultType) + + val field = + s""" + |private final $fieldType $fieldTerm; + |private final boolean $nullTerm; + |""".stripMargin + reusableMemberStatements.add(field) + + val init = + s""" + |${constant.code} + |$fieldTerm = ${constant.resultTerm}; + |$nullTerm = ${constant.nullTerm}; + |""".stripMargin + reusableInitStatements.add(init) + + GeneratedExpression(fieldTerm, nullTerm, "", constant.resultType) + } + + + /** + * Adds a reusable string constant to the member area of the generated class. + */ + def addReusableStringConstants(value: String): String = { + reusableStringConstants.get(value) match { + case Some(field) => field + case None => + val field = newName("str") + val stmt = + s""" + |private final $BINARY_STRING $field = $BINARY_STRING.fromString("$value");" + """.stripMargin + reusableMemberStatements.add(stmt) + reusableStringConstants(value) = field + field + } + } + + /** + * Adds a reusable MessageDigest to the member area of the generated [[Function]]. + * + * @return member variable term + */ + def addReusableMessageDigest(algorithm: String): String = { + val fieldTerm = newName("messageDigest") + + val field = s"final java.security.MessageDigest $fieldTerm;" + reusableMemberStatements.add(field) + + val fieldInit = + s""" + |try { + | $fieldTerm = java.security.MessageDigest.getInstance("$algorithm"); + |} catch (java.security.NoSuchAlgorithmException e) { + | throw new RuntimeException("Algorithm for '$algorithm' is not available.", e); + |} + |""".stripMargin + reusableInitStatements.add(fieldInit) + + fieldTerm + } + + /** + * Adds a constant SHA2 reusable MessageDigest to the member area of the generated [[Function]]. + * + * @return member variable term + */ + def addReusableSha2MessageDigest(constant: GeneratedExpression, nullCheck: Boolean): String = { + require(constant.literal, "Literal expected") + val fieldTerm = newName("messageDigest") + + val field = + s"final java.security.MessageDigest $fieldTerm;" + reusableMemberStatements.add(field) + + val bitLen = constant.resultTerm + val init = + s""" + |if ($bitLen == 224 || $bitLen == 256 || $bitLen == 384 || $bitLen == 512) { + | try { + | $fieldTerm = java.security.MessageDigest.getInstance("SHA-" + $bitLen); + | } catch (java.security.NoSuchAlgorithmException e) { + | throw new RuntimeException( + | "Algorithm for 'SHA-" + $bitLen + "' is not available.", e); + | } + |} else { + | throw new RuntimeException("Unsupported algorithm."); + |} + |""".stripMargin + val nullableInit = if (nullCheck) { + s""" + |${constant.code} + |if (${constant.nullTerm}) { + | $fieldTerm = null; + |} else { + | $init + |} + |""".stripMargin + } else { + s""" + |${constant.code} + |$init + |""".stripMargin + } + reusableInitStatements.add(nullableInit) + + fieldTerm + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala new file mode 100644 index 0000000..35a3999 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GeneratedExpression.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import org.apache.flink.table.`type`.InternalType + +/** + * Describes a generated expression. + * + * @param resultTerm term to access the result of the expression + * @param nullTerm boolean term that indicates if expression is null + * @param code code necessary to produce resultTerm and nullTerm + * @param resultType type of the resultTerm + * @param literal flag to indicate a constant expression do not reference input and can thus + * be used in the member area (e.g. as constructor parameter of a reusable + * instance) + */ +case class GeneratedExpression( + resultTerm: String, + nullTerm: String, + code: String, + resultType: InternalType, + literal: Boolean = false) + +object GeneratedExpression { + val ALWAYS_NULL = "true" + val NEVER_NULL = "false" + val NO_CODE = "" +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala new file mode 100644 index 0000000..bff8a62 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/Indenter.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +class IndentStringContext(sc: StringContext) { + def j(args: Any*): String = { + val sb = new StringBuilder() + for ((s, a) <- sc.parts zip args) { + sb append s + + val ind = getindent(s) + if (ind.nonEmpty) { + sb append a.toString.replaceAll("\n", "\n" + ind) + } else { + sb append a.toString + } + } + if (sc.parts.size > args.size) { + sb append sc.parts.last + } + + sb.toString() + } + + // get white indent after the last new line, if any + def getindent(str: String): String = { + val lastnl = str.lastIndexOf("\n") + if (lastnl == -1) "" + else { + val ind = str.substring(lastnl + 1) + val trimmed = ind.trim + if (trimmed.isEmpty || trimmed == "|") { + ind // ind is all whitespace or pipe for use with stripMargin. Use this + } else { + "" + } + } + } +} + +object Indenter { + implicit def toISC(sc: StringContext): IndentStringContext = new IndentStringContext(sc) +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala new file mode 100644 index 0000000..41676a8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import org.apache.flink.table.`type`._ + +object TypeCheckUtils { + + def isNumeric(dataType: InternalType): Boolean = dataType match { + case InternalTypes.INT | InternalTypes.BYTE | InternalTypes.SHORT + | InternalTypes.LONG | InternalTypes.FLOAT | InternalTypes.DOUBLE => true + case _: DecimalType => true + case _ => false + } + + def isString(dataType: InternalType): Boolean = dataType == InternalTypes.STRING + + def isBinary(dataType: InternalType): Boolean = dataType == InternalTypes.BINARY + + def isBoolean(dataType: InternalType): Boolean = dataType == InternalTypes.BOOLEAN + + def isDecimal(dataType: InternalType): Boolean = dataType.isInstanceOf[DecimalType] + + def isInteger(dataType: InternalType): Boolean = dataType == InternalTypes.INT + + def isLong(dataType: InternalType): Boolean = dataType == InternalTypes.LONG + + def isArray(dataType: InternalType): Boolean = dataType.isInstanceOf[ArrayType] + + def isMap(dataType: InternalType): Boolean = dataType.isInstanceOf[MapType] + + def isComparable(dataType: InternalType): Boolean = + !dataType.isInstanceOf[GenericType[_]] && + !dataType.isInstanceOf[MapType] && + !dataType.isInstanceOf[RowType] && + !isArray(dataType) + +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java new file mode 100644 index 0000000..c81d2cc --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import org.codehaus.janino.SimpleCompiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utilities to compile a generated code to a Class. + */ +public final class CompileUtils { + + // used for logging the generated codes to a same place + private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class); + + /** + * Cache of compile, Janino generates a new Class Loader and a new Class file every compile + * (guaranteeing that the class name will not be repeated). This leads to multiple tasks of + * the same process that generate a large number of duplicate class, resulting in a large + * number of Meta zone GC (class unloading), resulting in performance bottlenecks. So we add + * a cache to avoid this problem. + */ + protected static final Cache<Tuple2<ClassLoader, String>, Class<?>> COMPILED_CACHE = CacheBuilder + .newBuilder() + .maximumSize(100) // estimated cache size + .build(); + + /** + * Compiles a generated code to a Class. + * @param cl the ClassLoader used to load the class + * @param name the class name + * @param code the generated code + * @param <T> the class type + * @return the compiled class + */ + public static <T> Class<T> compile(ClassLoader cl, String name, String code) { + Tuple2<ClassLoader, String> cacheKey = Tuple2.of(cl, name); + Class<?> clazz = COMPILED_CACHE.getIfPresent(cacheKey); + if (clazz == null) { + clazz = doCompile(cl, name, code); + COMPILED_CACHE.put(cacheKey, clazz); + } + //noinspection unchecked + return (Class<T>) clazz; + } + + private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) { + checkNotNull(cl, "Classloader must not be null."); + CODE_LOG.debug("Compiling: %s \n\n Code:\n%s", name, code); + SimpleCompiler compiler = new SimpleCompiler(); + compiler.setParentClassLoader(cl); + try { + compiler.cook(code); + } catch (Throwable t) { + throw new InvalidProgramException( + "Table program cannot be compiled. This is a bug. Please file an issue.", t); + } + try { + //noinspection unchecked + return (Class<T>) compiler.getClassLoader().loadClass(name); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Can not load class " + name, e); + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java new file mode 100644 index 0000000..8f26e14 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method + * to get an instance by reference objects easily. + */ +public abstract class GeneratedClass<T> implements Serializable { + + private final String className; + private final String code; + private final Object[] references; + + private transient Class<T> compiledClass; + + protected GeneratedClass(String className, String code, Object[] references) { + checkNotNull(className, "name must not be null"); + checkNotNull(code, "code must not be null"); + checkNotNull(references, "references must not be null"); + this.className = className; + this.code = code; + this.references = references; + } + + /** + * Create a new instance of this generated class. + */ + @SuppressWarnings("unchecked") + public T newInstance(ClassLoader classLoader) { + try { + return (T) compile(classLoader).getConstructor(Object[].class).newInstance(references); + } catch (Exception e) { + throw new RuntimeException( + "Could not instantiate generated class '" + className + "'", e); + } + } + + private Class<?> compile(ClassLoader classLoader) { + if (compiledClass == null) { + // cache the compiled class + compiledClass = CompileUtils.compile(classLoader, className, code); + } + return compiledClass; + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java new file mode 100644 index 0000000..ce082ba --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedCollector.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.util.Collector; + +/** + * Describes a generated {@link Collector}. + * + * @param <C> type of collector + */ +public final class GeneratedCollector<C extends Collector<?>> extends GeneratedClass<C> { + + private static final long serialVersionUID = -7355875544905245676L; + + /** + * Creates a GeneratedCollector. + * + * @param className class name of the generated Collector. + * @param code code of the generated Collector. + * @param references referenced objects of the generated Collector. + */ + public GeneratedCollector(String className, String code, Object[] references) { + super(className, code, references); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java new file mode 100644 index 0000000..e43da42 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.api.common.functions.Function; + +/** + * Describes a generated {@link Function}. + * + * @param <F> type of Function + */ +public final class GeneratedFunction<F extends Function> extends GeneratedClass<F> { + + private static final long serialVersionUID = -7355875544905245676L; + + /** + * Creates a GeneratedFunction. + * + * @param className class name of the generated Function. + * @param code code of the generated Function. + * @param references referenced objects of the generated Function. + */ + public GeneratedFunction(String className, String code, Object[] references) { + super(className, code, references); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java new file mode 100644 index 0000000..9331bb3 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedInput.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.api.common.io.InputFormat; + +/** + * Describes a generated {@link InputFormat}. + * + * @param <F> type of Function + */ +public final class GeneratedInput<F extends InputFormat<?, ?>> extends GeneratedClass<F> { + + private static final long serialVersionUID = -7355875544905245676L; + + /** + * Creates a GeneratedInput. + * + * @param className class name of the generated Function. + * @param code code of the generated Function. + * @param references referenced objects of the generated Function. + */ + public GeneratedInput(String className, String code, Object[] references) { + super(className, code, references); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java new file mode 100644 index 0000000..2e67713 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/generated/CompileUtilsTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.generated; + +import org.apache.flink.api.common.InvalidProgramException; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.net.URL; +import java.net.URLClassLoader; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +/** + * Tests for {@link CompileUtils}. + */ +public class CompileUtilsTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void before() { + // cleanup cached class before tests + CompileUtils.COMPILED_CACHE.invalidateAll(); + } + + @Test + public void testCacheReuse() { + String code = + "public class Main {\n" + + " int i;\n" + + " int j;\n" + + "}"; + + Class<?> class1 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code); + Class<?> class2 = CompileUtils.compile(this.getClass().getClassLoader(), "Main", code); + Class<?> class3 = CompileUtils.compile(new TestClassLoader(), "Main", code); + assertSame(class1, class2); + assertNotSame(class1, class3); + } + + @Test + public void testWrongCode() { + String code = + "public class111 Main {\n" + + " int i;\n" + + " int j;\n" + + "}"; + + thrown.expect(InvalidProgramException.class); + thrown.expectMessage("Table program cannot be compiled. This is a bug. Please file an issue."); + CompileUtils.compile(this.getClass().getClassLoader(), "Main", code); + } + + private static class TestClassLoader extends URLClassLoader { + + TestClassLoader() { + super(new URL[0], Thread.currentThread().getContextClassLoader()); + } + } +}