Repository: flink Updated Branches: refs/heads/master ef0653aa8 -> 527e7499c
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 82f0051..c9f98e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.{StreamQueryConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.{AggregationCodeGenerator, CodeGenerator} import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ @@ -71,7 +71,7 @@ object AggregateUtil { * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause */ private[flink] def createUnboundedOverProcessFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, inputTypeInfo: TypeInformation[Row], @@ -150,7 +150,7 @@ object AggregateUtil { * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createGroupAggregateFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputRowType: RelDataType, inputFieldTypes: Seq[TypeInformation[_]], @@ -211,7 +211,7 @@ object AggregateUtil { * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createBoundedOverProcessFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, inputTypeInfo: TypeInformation[Row], @@ -312,7 +312,7 @@ object AggregateUtil { * NOTE: this function is only used for time based window on batch tables. */ def createDataSetWindowPrepareMapFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], groupings: Array[Int], @@ -418,7 +418,7 @@ object AggregateUtil { * NOTE: this function is only used for sliding windows with partial aggregates on batch tables. */ def createDataSetSlideWindowPrepareGroupReduceFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], groupings: Array[Int], @@ -530,7 +530,7 @@ object AggregateUtil { * NOTE: this function is only used for window on batch tables. */ def createDataSetWindowAggregationGroupReduceFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], physicalInputRowType: RelDataType, @@ -681,7 +681,7 @@ object AggregateUtil { * */ def createDataSetWindowAggregationMapPartitionFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], physicalInputRowType: RelDataType, @@ -754,7 +754,7 @@ object AggregateUtil { * */ private[flink] def createDataSetWindowAggregationCombineFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], physicalInputRowType: RelDataType, @@ -819,7 +819,7 @@ object AggregateUtil { * respective output type are generated as well. */ private[flink] def createDataSetAggregateFunctions( - generator: CodeGenerator, + generator: AggregationCodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, inputFieldTypeInfo: Seq[TypeInformation[_]], @@ -992,7 +992,7 @@ object AggregateUtil { } private[flink] def createDataStreamAggregateFunction( - generator: CodeGenerator, + generator: AggregationCodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, inputFieldTypeInfo: Seq[TypeInformation[_]], http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index e3e292e..5f274db 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -40,7 +40,7 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableEnvironment} import org.apache.flink.table.calcite.FlinkPlannerImpl -import org.apache.flink.table.codegen.{CodeGenerator, Compiler, GeneratedFunction} +import org.apache.flink.table.codegen.{FunctionCodeGenerator, Compiler, GeneratedFunction} import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.plan.nodes.FlinkConventions @@ -115,7 +115,7 @@ abstract class ExpressionTestBase { def evaluateExprs() = { val relBuilder = context._1 val config = new TableConfig() - val generator = new CodeGenerator(config, false, typeInfo) + val generator = new FunctionCodeGenerator(config, false, typeInfo) // cast expressions to String val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR))