[FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856485be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856485be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856485be Branch: refs/heads/table-retraction Commit: 856485be3be3bb240a068019fe21da3556f8b26f Parents: 455a3c5 Author: Hequn Cheng <chenghe...@gmail.com> Authored: Tue Apr 18 16:54:09 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed May 3 11:33:07 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 63 +++- .../table/api/StreamTableEnvironment.scala | 62 +++- .../flink/table/api/TableEnvironment.scala | 65 ++-- .../flink/table/plan/nodes/CommonCalc.scala | 26 +- .../table/plan/nodes/CommonCorrelate.scala | 58 +--- .../flink/table/plan/nodes/CommonScan.scala | 24 +- .../table/plan/nodes/dataset/BatchScan.scala | 10 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 23 +- .../plan/nodes/dataset/DataSetCorrelate.scala | 32 +- .../plan/nodes/dataset/DataSetValues.scala | 2 +- .../plan/nodes/datastream/DataStreamCalc.scala | 30 +- .../nodes/datastream/DataStreamCorrelate.scala | 38 ++- .../datastream/DataStreamGroupAggregate.scala | 24 +- .../DataStreamGroupWindowAggregate.scala | 47 ++- .../datastream/DataStreamOverAggregate.scala | 62 ++-- .../plan/nodes/datastream/DataStreamRel.scala | 6 +- .../plan/nodes/datastream/DataStreamScan.scala | 3 +- .../plan/nodes/datastream/DataStreamUnion.scala | 3 +- .../nodes/datastream/DataStreamValues.scala | 14 +- .../plan/nodes/datastream/StreamScan.scala | 23 +- .../datastream/StreamTableSourceScan.scala | 4 +- .../plan/nodes/logical/FlinkLogicalCalc.scala | 2 +- .../datastream/DataStreamRetractionRules.scala | 16 +- .../runtime/CRowCorrelateFlatMapRunner.scala | 83 +++++ .../flink/table/runtime/CRowFlatMapRunner.scala | 72 ++++ .../table/runtime/CRowInputMapRunner.scala | 57 ++++ .../table/runtime/CRowOutputMapRunner.scala | 60 ++++ .../table/runtime/CRowWrappingCollector.scala | 41 +++ .../flink/table/runtime/FlatMapRunner.scala | 17 +- .../aggregate/AggregateAggFunction.scala | 15 +- .../table/runtime/aggregate/AggregateUtil.scala | 47 +-- ...SetSessionWindowAggReduceGroupFunction.scala | 4 +- ...taSetSlideWindowAggReduceGroupFunction.scala | 4 +- ...TumbleTimeWindowAggReduceGroupFunction.scala | 4 +- .../aggregate/GroupAggProcessFunction.scala | 58 +++- ...rementalAggregateAllTimeWindowFunction.scala | 7 +- .../IncrementalAggregateAllWindowFunction.scala | 11 +- ...IncrementalAggregateTimeWindowFunction.scala | 7 +- .../IncrementalAggregateWindowFunction.scala | 13 +- .../aggregate/ProcTimeBoundedRangeOver.scala | 30 +- .../aggregate/ProcTimeBoundedRowsOver.scala | 26 +- .../ProcTimeUnboundedNonPartitionedOver.scala | 23 +- .../ProcTimeUnboundedPartitionedOver.scala | 22 +- .../aggregate/RowTimeBoundedRangeOver.scala | 32 +- .../aggregate/RowTimeBoundedRowsOver.scala | 30 +- .../aggregate/RowTimeUnboundedOver.scala | 50 +-- .../aggregate/TimeWindowPropertyCollector.scala | 34 +- .../runtime/io/CRowValuesInputFormat.scala | 59 ++++ .../table/runtime/io/ValuesInputFormat.scala | 17 +- .../apache/flink/table/runtime/types/CRow.scala | 55 +++ .../table/runtime/types/CRowComparator.scala | 83 +++++ .../table/runtime/types/CRowSerializer.scala | 78 +++++ .../table/runtime/types/CRowTypeInfo.scala | 98 ++++++ .../apache/flink/table/sinks/CsvTableSink.scala | 2 + .../flink/table/TableEnvironmentTest.scala | 72 +++- .../scala/batch/TableEnvironmentITCase.scala | 20 ++ .../api/scala/stream/RetractionITCase.scala | 331 +++++++++++++++++++ .../api/scala/stream/TableSinkITCase.scala | 2 +- .../api/scala/stream/utils/StreamITCase.scala | 11 +- ...ProcessingOverRangeProcessFunctionTest.scala | 105 +++--- .../runtime/types/CRowComparatorTest.scala | 61 ++++ .../table/utils/MockTableEnvironment.scala | 8 + 62 files changed, 1896 insertions(+), 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 00cf11c..34f5018 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -26,16 +26,20 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.RuleSet +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.configuration.Configuration import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.Expression import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable} +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} import org.apache.flink.types.Row @@ -128,6 +132,56 @@ abstract class BatchTableEnvironment( } /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + override protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] = { + + if (requestedTypeInfo.getTypeClass == classOf[Row]) { + // Row to Row, no conversion needed + None + } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) { + // Row to CRow, only needs to be wrapped + Some( + new RichMapFunction[Row, CRow] { + private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true) + override def map(value: Row): CRow = { + outCRow.row = value + outCRow + } + }.asInstanceOf[MapFunction[IN, OUT]] + ) + } else { + // some type that is neither Row or CRow + + val converterFunction = generateRowConverterFunction[OUT]( + physicalTypeInfo.asInstanceOf[TypeInformation[Row]], + logicalRowType, + requestedTypeInfo, + functionName + ) + + val mapFunction = new MapRunner[IN, OUT]( + converterFunction.name, + converterFunction.code, + converterFunction.returnType) + + Some(mapFunction) + } + } + + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. * @@ -285,10 +339,15 @@ abstract class BatchTableEnvironment( logicalPlan match { case node: DataSetRel => val plan = node.translateToPlan(this) - val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion") + val conversion = + getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion") conversion match { case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary - case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + case Some(mapFunction: MapFunction[Row, A]) => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .asInstanceOf[DataSet[A]] } case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index f532c5b..e2bccf3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream @@ -36,6 +37,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamRel import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable} +import org.apache.flink.table.runtime.CRowInputMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.types.Row @@ -169,6 +172,52 @@ abstract class StreamTableEnvironment( } } + + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + override protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] = { + + if (requestedTypeInfo.getTypeClass == classOf[CRow]) { + // CRow to CRow, no conversion needed + None + } else if (requestedTypeInfo.getTypeClass == classOf[Row]) { + // CRow to Row, only needs to be unwrapped + Some( + new MapFunction[CRow, Row] { + override def map(value: CRow): Row = value.row + }.asInstanceOf[MapFunction[IN, OUT]] + ) + } else { + // Some type that is neither CRow nor Row + + val converterFunction = generateRowConverterFunction[OUT]( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + logicalRowType, + requestedTypeInfo, + functionName + ) + + Some(new CRowInputMapRunner[OUT]( + converterFunction.name, + converterFunction.code, + converterFunction.returnType) + .asInstanceOf[MapFunction[IN, OUT]]) + + } + } + /** * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s * catalog. @@ -334,10 +383,15 @@ abstract class StreamTableEnvironment( logicalPlan match { case node: DataStreamRel => val plan = node.translateToPlan(this) - val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + val conversion = + getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") conversion match { case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary - case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + case Some(mapFunction: MapFunction[CRow, A]) => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .asInstanceOf[DataStream[A]] } case _ => @@ -355,9 +409,9 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataStream = translate[Row]( + val dataStream = translate[CRow]( optimizedPlan, - ast.getRowType)(new GenericTypeInfo(classOf[Row])) + ast.getRowType)(new GenericTypeInfo(classOf[CRow])) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bd974b0..0a37f00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} -import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} @@ -56,7 +56,7 @@ import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.RelTable -import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.runtime.types.CRowTypeInfo import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog @@ -615,6 +615,18 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case cr: CRowTypeInfo => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Alias(UnresolvedFieldReference(origName), name, _), _) => + val idx = cr.getFieldIndex(origName) + if (idx < 0) { + throw new TableException(s"$origName is not a field of type $cr") + } + (idx, name) + case _ => throw new TableException( + "Field reference expression or alias on field expression expected.") + } case c: CaseClassTypeInfo[A] => exprs.zipWithIndex.map { case (UnresolvedFieldReference(name), idx) => (idx, name) @@ -660,37 +672,38 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Creates a final converter that maps the internal row type to external type. * - * @param physicalRowTypeInfo the input of the sink + * @param physicalTypeInfo the input of the sink * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def sinkConversion[T]( - physicalRowTypeInfo: TypeInformation[Row], + protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] + + protected def generateRowConverterFunction[OUT]( + inputTypeInfo: TypeInformation[Row], logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[T], - functionName: String) - : Option[MapFunction[Row, T]] = { + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + GeneratedFunction[MapFunction[Row, OUT], OUT] = { // validate that at least the field types of physical and logical type match // we do that here to make sure that plan translation was correct val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) - if (physicalRowTypeInfo != logicalRowTypeInfo) { + if (logicalRowTypeInfo != inputTypeInfo) { throw TableException("The field types of physical and logical row types do not match." + "This is a bug and should not happen. Please file an issue.") } - // requested type is a generic Row, no conversion needed - if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] && - requestedTypeInfo.getTypeClass == classOf[Row]) { - return None - } - // convert to type information - val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => - FlinkTypeFactory.toTypeInfo(relDataType.getType) - } + val logicalFieldTypes = logicalRowType.getFieldList.asScala + .map(t => FlinkTypeFactory.toTypeInfo(t.getType)) + // field names val logicalFieldNames = logicalRowType.getFieldNames.asScala @@ -698,8 +711,8 @@ abstract class TableEnvironment(val config: TableConfig) { if (requestedTypeInfo.getArity != logicalFieldTypes.length) { throw new TableException("Arity of result does not match requested type.") } - requestedTypeInfo match { + requestedTypeInfo match { // POJO type requested case pt: PojoTypeInfo[_] => logicalFieldNames.zip(logicalFieldTypes) foreach { @@ -746,7 +759,7 @@ abstract class TableEnvironment(val config: TableConfig) { val generator = new CodeGenerator( config, false, - physicalRowTypeInfo, + inputTypeInfo, None, None) @@ -760,20 +773,12 @@ abstract class TableEnvironment(val config: TableConfig) { |return ${conversion.resultTerm}; |""".stripMargin - val genFunction = generator.generateFunction( + generator.generateFunction( functionName, - classOf[MapFunction[Row, T]], + classOf[MapFunction[Row, OUT]], body, requestedTypeInfo) - - val mapFunction = new MapRunner[Row, T]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - Some(mapFunction) } - } /** http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 96a7470..bec52ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -26,21 +26,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} -import org.apache.flink.table.runtime.FlatMapRunner import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait CommonCalc { +trait CommonCalc[T] { - private[flink] def functionBody( + private[flink] def generateFunction( generator: CodeGenerator, + ruleDescription: String, inputType: TypeInformation[Row], rowType: RelDataType, calcProgram: RexProgram, - config: TableConfig) - : String = { + config: TableConfig): + GeneratedFunction[FlatMapFunction[Row, Row], Row] = { val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) @@ -53,7 +53,7 @@ trait CommonCalc { expandedExpressions) // only projection - if (condition == null) { + val body = if (condition == null) { s""" |${projection.code} |${generator.collectorTerm}.collect(${projection.resultTerm}); @@ -82,16 +82,12 @@ trait CommonCalc { |""".stripMargin } } - } - - private[flink] def calcMapFunction( - genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row]) - : RichFlatMapFunction[Row, Row] = { - new FlatMapRunner[Row, Row]( - genFunction.name, - genFunction.code, - genFunction.returnType) + generator.generateFunction( + ruleDescription, + classOf[FlatMapFunction[Row, Row]], + body, + returnType) } private[flink] def conditionToString( http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 6c4066b..83a68c0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -23,12 +23,11 @@ import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{TableConfig, TableException} -import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} import org.apache.flink.table.functions.utils.TableSqlFunction -import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector} +import org.apache.flink.table.runtime.TableFunctionCollector import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -36,59 +35,12 @@ import scala.collection.JavaConverters._ /** * Join a user-defined table function */ -trait CommonCorrelate { - - /** - * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table - * and user-defined table function. - */ - private[flink] def correlateMapFunction( - config: TableConfig, - inputTypeInfo: TypeInformation[Row], - udtfTypeInfo: TypeInformation[Any], - rowType: RelDataType, - joinType: SemiJoinType, - rexCall: RexCall, - condition: Option[RexNode], - pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping - ruleDescription: String) - : CorrelateFlatMapRunner[Row, Row] = { - - val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) - - val flatMap = generateFunction( - config, - inputTypeInfo, - udtfTypeInfo, - returnType, - rowType, - joinType, - rexCall, - pojoFieldMapping, - ruleDescription) - - val collector = generateCollector( - config, - inputTypeInfo, - udtfTypeInfo, - returnType, - rowType, - condition, - pojoFieldMapping) - - new CorrelateFlatMapRunner[Row, Row]( - flatMap.name, - flatMap.code, - collector.name, - collector.code, - flatMap.returnType) - - } +trait CommonCorrelate[T] { /** * Generates the flat map function to run the user-defined table function. */ - private def generateFunction( + private[flink] def generateFunction( config: TableConfig, inputTypeInfo: TypeInformation[Row], udtfTypeInfo: TypeInformation[Any], @@ -97,8 +49,8 @@ trait CommonCorrelate { joinType: SemiJoinType, rexCall: RexCall, pojoFieldMapping: Option[Array[Int]], - ruleDescription: String) - : GeneratedFunction[FlatMapFunction[Row, Row], Row] = { + ruleDescription: String): + GeneratedFunction[FlatMapFunction[Row, Row], Row] = { val functionGenerator = new CodeGenerator( config, http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala index 0a0d204..5c44525 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -21,14 +21,13 @@ package org.apache.flink.table.plan.nodes import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.types.Row /** * Common class for batch and stream scans. */ -trait CommonScan { +trait CommonScan[T] { /** * We check if the input type is exactly the same as the internal row type. @@ -36,20 +35,17 @@ trait CommonScan { */ private[flink] def needsConversion( externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[Row]) - : Boolean = { - + internalTypeInfo: TypeInformation[T]): Boolean = externalTypeInfo != internalTypeInfo - } - private[flink] def getConversionMapper( + private[flink] def generateConversionFunction( config: TableConfig, inputType: TypeInformation[Any], expectedType: TypeInformation[Row], conversionOperatorName: String, fieldNames: Seq[String], - inputPojoFieldMapping: Option[Array[Int]] = None) - : MapFunction[Any, Row] = { + inputPojoFieldMapping: Option[Array[Int]] = None): + GeneratedFunction[MapFunction[Any, Row], Row] = { val generator = new CodeGenerator( config, @@ -65,17 +61,11 @@ trait CommonScan { |return ${conversion.resultTerm}; |""".stripMargin - val genFunction = generator.generateFunction( + generator.generateFunction( conversionOperatorName, classOf[MapFunction[Any, Row]], body, expectedType) - - new MapRunner[Any, Row]( - genFunction.name, - genFunction.code, - genFunction.returnType) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index b39b8ed..64286f0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -23,12 +23,13 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.FlinkTable +import org.apache.flink.table.runtime.MapRunner import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait BatchScan extends CommonScan with DataSetRel { +trait BatchScan extends CommonScan[Row] with DataSetRel { protected def convertToInternalRow( input: DataSet[Any], @@ -43,7 +44,7 @@ trait BatchScan extends CommonScan with DataSetRel { // conversion if (needsConversion(inputType, internalType)) { - val mapFunc = getConversionMapper( + val function = generateConversionFunction( config, inputType, internalType, @@ -51,6 +52,11 @@ trait BatchScan extends CommonScan with DataSetRel { getRowType.getFieldNames, Some(flinkTable.fieldIndexes)) + val mapFunc = new MapRunner[Any, Row]( + function.name, + function.code, + function.returnType) + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" input.map(mapFunc).name(opName) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index e05b5a8..359f5a5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex._ -import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonCalc +import org.apache.flink.table.runtime.FlatMapRunner import org.apache.flink.types.Row /** @@ -44,7 +47,7 @@ class DataSetCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc + with CommonCalc[Row] with DataSetRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -83,24 +86,22 @@ class DataSetCalc( val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val returnType = FlinkTypeFactory + .toInternalRowTypeInfo(getRowType) + .asInstanceOf[RowTypeInfo] val generator = new CodeGenerator(config, false, inputDS.getType) - val body = functionBody( + val genFunction = generateFunction( generator, + ruleDescription, inputDS.getType, getRowType, calcProgram, config) - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Row, Row]], - body, - returnType) + val mapFunc = new FlatMapRunner(genFunction.name, genFunction.code, returnType) - val mapFunc = calcMapFunction(genFunction) inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 2a62e21..1ac6f6f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -26,9 +26,11 @@ import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan +import org.apache.flink.table.runtime.CorrelateFlatMapRunner import org.apache.flink.types.Row /** @@ -45,7 +47,7 @@ class DataSetCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, inputNode) - with CommonCorrelate + with CommonCorrelate[Row] with DataSetRel { override def deriveRowType() = relRowType @@ -95,20 +97,38 @@ class DataSetCorrelate( val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - val pojoFieldMapping = sqlFunction.getPojoFieldMapping + val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val mapFunc = correlateMapFunction( + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + + val flatMap = generateFunction( config, inputDS.getType, udtfTypeInfo, - getRowType, + returnType, + rowType, joinType, rexCall, - condition, - Some(pojoFieldMapping), + pojoFieldMapping, ruleDescription) + val collector = generateCollector( + config, + inputDS.getType, + udtfTypeInfo, + returnType, + rowType, + condition, + pojoFieldMapping) + + val mapFunc = new CorrelateFlatMapRunner[Row, Row]( + flatMap.name, + flatMap.code, + collector.name, + collector.code, + flatMap.returnType) + inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala index 3ebee2c..948dd27 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala @@ -88,7 +88,7 @@ class DataSetValues( generatedRecords.map(_.code), returnType) - val inputFormat = new ValuesInputFormat[Row]( + val inputFormat = new ValuesInputFormat( generatedFunction.name, generatedFunction.code, generatedFunction.returnType) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index b015a1d..0bf723d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram -import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonCalc +import org.apache.flink.table.runtime.CRowFlatMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row /** @@ -44,7 +47,7 @@ class DataStreamCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc + with CommonCalc[CRow] with DataStreamRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -75,28 +78,29 @@ class DataStreamCalc( estimateRowCount(calcProgram, rowCnt) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType + val outputRowType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - val generator = new CodeGenerator(config, false, inputDataStream.getType) + val generator = new CodeGenerator(config, false, inputRowType) - val body = functionBody( + val genFunction = generateFunction( generator, - inputDataStream.getType, + ruleDescription, + inputRowType, getRowType, calcProgram, config) - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Row, Row]], - body, - FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) + val mapFunc = new CRowFlatMapRunner( + genFunction.name, + genFunction.code, + CRowTypeInfo(outputRowType)) - val mapFunc = calcMapFunction(genFunction) inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index 7680904..dff5099 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -25,10 +25,12 @@ import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * Flink RelNode which matches along with join a user defined table function. @@ -44,7 +46,7 @@ class DataStreamCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, input) - with CommonCorrelate + with CommonCorrelate[CRow] with DataStreamRel { override def deriveRowType() = relRowType @@ -79,30 +81,50 @@ class DataStreamCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig // we do not need to specify input type val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo] val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - val pojoFieldMapping = sqlFunction.getPojoFieldMapping + val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val mapFunc = correlateMapFunction( + val rowType = inputType.rowType + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(relRowType) + + val flatMap = generateFunction( config, - inputDS.getType, + rowType, udtfTypeInfo, + returnType, getRowType, joinType, rexCall, - condition, - Some(pojoFieldMapping), + pojoFieldMapping, ruleDescription) + val collector = generateCollector( + config, + rowType, + udtfTypeInfo, + returnType, + getRowType, + condition, + pojoFieldMapping) + + val mapFunc = new CRowCorrelateFlatMapRunner( + flatMap.name, + flatMap.code, + collector.name, + collector.code, + CRowTypeInfo(flatMap.returnType)) + inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index d88c72b..14ae343 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -28,8 +28,9 @@ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.plan.nodes.CommonAggregate -import org.apache.flink.types.Row +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * @@ -91,11 +92,10 @@ class DataStreamGroupAggregate( .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) val generator = new CodeGenerator( tableEnv.getConfig, @@ -117,28 +117,30 @@ class DataStreamGroupAggregate( generator, namedAggregates, inputType, - groupings) + groupings, + DataStreamRetractionRules.isAccRetract(this), + DataStreamRetractionRules.isAccRetract(getInput)) - val result: DataStream[Row] = + val result: DataStream[CRow] = // grouped / keyed aggregation if (groupings.nonEmpty) { inputDS .keyBy(groupings: _*) .process(processFunction) - .returns(rowTypeInfo) + .returns(outRowType) .name(keyedAggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } // global / non-keyed aggregation else { inputDS - .keyBy(new NullByteKeySelector[Row]) + .keyBy(new NullByteKeySelector[CRow]) .process(processFunction) .setParallelism(1) .setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(outRowType) .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } result } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 8959b23..d503792 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator @@ -35,11 +35,12 @@ import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._ +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} -import org.apache.flink.types.Row class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -103,11 +104,20 @@ class DataStreamGroupWindowAggregate( namedProperties)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + + if (consumeRetraction) { + throw new TableException( + "Retraction on windowed GroupBy aggregation is not supported yet. " + + "Note: Windowed GroupBy aggregation should not follow a " + + "non-windowed GroupBy aggregation.") + } + + val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) val aggString = aggregationToString( inputType, @@ -138,17 +148,19 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(grouping: _*) val windowedStream = createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( generator, namedAggregates, inputType, - rowRelDataType) + rowRelDataType, + grouping + ) windowedStream - .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo) + .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType) .name(keyedAggOpName) } // global / non-keyed aggregation @@ -160,17 +172,19 @@ class DataStreamGroupWindowAggregate( val windowedStream = createNonKeyedWindowedStream(window, inputDS) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( generator, namedAggregates, inputType, - rowRelDataType) + rowRelDataType, + Array[Int]() + ) windowedStream - .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo) + .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType) .name(nonKeyedAggOpName) } } @@ -178,9 +192,10 @@ class DataStreamGroupWindowAggregate( object DataStreamGroupWindowAggregate { - - private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple]) - : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match { + private def createKeyedWindowedStream( + groupWindow: LogicalWindow, + stream: KeyedStream[CRow, Tuple]): + WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match { case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => stream.window(TumblingProcessingTimeWindows.of(asTime(size))) @@ -221,8 +236,10 @@ object DataStreamGroupWindowAggregate { stream.window(EventTimeSessionWindows.withGap(asTime(gap))) } - private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row]) - : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match { + private def createNonKeyedWindowedStream( + groupWindow: LogicalWindow, + stream: DataStream[CRow]): + AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match { case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) => stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size))) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 031d533..a0d10a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -25,18 +25,17 @@ import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.OverAggregate import org.apache.flink.table.runtime.aggregate._ -import org.apache.flink.types.Row - import org.apache.flink.api.java.functions.NullByteKeySelector import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.functions.{ProcTimeType, RowTimeType} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} class DataStreamOverAggregate( logicWindow: Window, @@ -87,7 +86,7 @@ class DataStreamOverAggregate( namedAggregates)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { if (logicWindow.groups.size > 1) { throw new TableException( "Unsupported use of OVER windows. All aggregates must be computed on the same window.") @@ -110,6 +109,8 @@ class DataStreamOverAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val generator = new CodeGenerator( tableEnv.getConfig, false, @@ -120,6 +121,12 @@ class DataStreamOverAggregate( .get(orderKey.getFieldIndex) .getValue + if (consumeRetraction) { + throw new TableException( + "Retraction on Over window aggregation is not supported yet. " + + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") + } + timeType match { case _: ProcTimeType => // proc-time OVER window @@ -138,8 +145,7 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else { throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") @@ -153,16 +159,14 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = true, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { // bounded OVER window createBoundedAndCurrentRowOverWindow( generator, inputDS, isRowTimeType = true, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else { throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") @@ -177,16 +181,16 @@ class DataStreamOverAggregate( def createUnboundedAndCurrentRowOverWindow( generator: CodeGenerator, - inputDS: DataStream[Row], + inputDS: DataStream[CRow], isRowTimeType: Boolean, - isRowsClause: Boolean): DataStream[Row] = { + isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) val partitionKeys: Array[Int] = overWindow.keys.toArray val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates // get the output types - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) val processFunction = AggregateUtil.createUnboundedOverProcessFunction( generator, @@ -196,30 +200,28 @@ class DataStreamOverAggregate( partitionKeys.nonEmpty, isRowsClause) - val result: DataStream[Row] = + val result: DataStream[CRow] = // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(rowTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } // non-partitioned aggregation else { if (isRowTimeType) { - inputDS.keyBy(new NullByteKeySelector[Row]) + inputDS.keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } else { inputDS .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } } result @@ -227,9 +229,9 @@ class DataStreamOverAggregate( def createBoundedAndCurrentRowOverWindow( generator: CodeGenerator, - inputDS: DataStream[Row], + inputDS: DataStream[CRow], isRowTimeType: Boolean, - isRowsClause: Boolean): DataStream[Row] = { + isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) val partitionKeys: Array[Int] = overWindow.keys.toArray @@ -239,9 +241,9 @@ class DataStreamOverAggregate( getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0) // get the output types - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) - val processFunction = AggregateUtil.createBoundedOverProcessFunction( + val processFunction = AggregateUtil.createBoundedOverProcessFunction[CRow]( generator, namedAggregates, inputType, @@ -249,24 +251,22 @@ class DataStreamOverAggregate( isRowsClause, isRowTimeType ) - val result: DataStream[Row] = + val result: DataStream[CRow] = // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(rowTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } // non-partitioned aggregation else { inputDS - .keyBy(new NullByteKeySelector[Row]) + .keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } result } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 50d1d06..5c009a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.nodes.FlinkRelNode -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.types.CRow trait DataStreamRel extends FlinkRelNode { @@ -29,9 +29,9 @@ trait DataStreamRel extends FlinkRelNode { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @return DataStream of type [[Row]] + * @return DataStream of type [[CRow]] */ - def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] + def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow] /** * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index c187ae8..1a43edb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row /** @@ -53,7 +54,7 @@ class DataStreamScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream convertToInternalRow(inputDataStream, dataStreamTable, config) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index f340ac7..68e4e6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -60,7 +61,7 @@ class DataStreamUnion( s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index 0ab4a48..5e8d127 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -28,8 +28,8 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.runtime.io.ValuesInputFormat -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.io.{CRowValuesInputFormat} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import scala.collection.JavaConverters._ @@ -57,11 +57,11 @@ class DataStreamValues( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig - val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val returnType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) val generator = new CodeGenerator(config) @@ -77,12 +77,12 @@ class DataStreamValues( val generatedFunction = generator.generateValuesInputFormat( ruleDescription, generatedRecords.map(_.code), - returnType) + returnType.rowType) - val inputFormat = new ValuesInputFormat[Row]( + val inputFormat = new CRowValuesInputFormat( generatedFunction.name, generatedFunction.code, - generatedFunction.returnType) + returnType) tableEnv.execEnv.createInput(inputFormat, returnType) } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index 6d08302..02716cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -23,41 +23,46 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.FlinkTable -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.CRowOutputMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait StreamScan extends CommonScan with DataStreamRel { +trait StreamScan extends CommonScan[CRow] with DataStreamRel { protected def convertToInternalRow( input: DataStream[Any], flinkTable: FlinkTable[_], config: TableConfig) - : DataStream[Row] = { + : DataStream[CRow] = { val inputType = input.getType - - val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val internalType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType)) // conversion if (needsConversion(inputType, internalType)) { - val mapFunc = getConversionMapper( + val function = generateConversionFunction( config, inputType, - internalType, + internalType.rowType, "DataStreamSourceConversion", getRowType.getFieldNames, Some(flinkTable.fieldIndexes)) + val mapFunc = new CRowOutputMapRunner( + function.name, + function.code, + internalType) + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - input.map(mapFunc).name(opName) + input.map(mapFunc).name(opName).returns(internalType) } // no conversion necessary, forward else { - input.asInstanceOf[DataStream[Row]] + input.asInstanceOf[DataStream[CRow]] } } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 0a466a3..44ad706 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources.{StreamTableSource, TableSource} -import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -64,7 +64,7 @@ class StreamTableSourceScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala index ec90392..0ca079e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala @@ -34,7 +34,7 @@ class FlinkLogicalCalc( calcProgram: RexProgram) extends Calc(cluster, traitSet, input, calcProgram) with FlinkLogicalRel - with CommonCalc { + with CommonCalc[Any] { override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { new FlinkLogicalCalc(cluster, traitSet, child, program) http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala index aeb67b6..bd9a7ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala @@ -82,6 +82,14 @@ object DataStreamRetractionRules { } /** + * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. + */ + def isAccRetract(node: RelNode): Boolean = { + val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) + null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract + } + + /** * Rule that assigns the default retraction information to [[DataStreamRel]] nodes. * The default is to not publish updates as retraction messages and [[AccMode.Acc]]. */ @@ -190,14 +198,6 @@ object DataStreamRetractionRules { } /** - * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. - */ - def isAccRetract(node: RelNode): Boolean = { - val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) - null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract - } - - /** * Set [[AccMode.AccRetract]] to a [[RelNode]]. */ def setAccRetract(relNode: RelNode): RelNode = { http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala new file mode 100644 index 0000000..66e51b1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.{Logger, LoggerFactory} + +/** + * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowCorrelateFlatMapRunner( + flatMapName: String, + flatMapCode: String, + collectorName: String, + collectorCode: String, + @transient returnType: TypeInformation[CRow]) + extends RichFlatMapFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[Any] { + + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var function: FlatMapFunction[Row, Row] = _ + private var collector: TableFunctionCollector[_] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) + LOG.debug("Instantiating TableFunctionCollector.") + collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] + this.cRowWrapper = new CRowWrappingCollector() + + LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode") + val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode) + val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]]) + LOG.debug("Instantiating FlatMapFunction.") + function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]] + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + } + + override def flatMap(in: CRow, out: Collector[CRow]): Unit = { + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + + collector.setCollector(cRowWrapper) + collector.setInput(in.row) + collector.reset() + + function.flatMap(in.row, cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala new file mode 100644 index 0000000..9a4650b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +/** + * FlatMapRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowFlatMapRunner( + name: String, + code: String, + @transient returnType: TypeInformation[CRow]) + extends RichFlatMapFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[FlatMapFunction[Row, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: FlatMapFunction[Row, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating FlatMapFunction.") + function = clazz.newInstance() + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + + this.cRowWrapper = new CRowWrappingCollector() + } + + override def flatMap(in: CRow, out: Collector[CRow]): Unit = { + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + function.flatMap(in.row, cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala new file mode 100644 index 0000000..8e95c93 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +/** + * MapRunner with [[CRow]] input. + */ +class CRowInputMapRunner[OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[OUT]) + extends RichMapFunction[CRow, OUT] + with ResultTypeQueryable[OUT] + with Compiler[MapFunction[Row, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, OUT] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): OUT = { + function.map(in.row) + } + + override def getProducedType: TypeInformation[OUT] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala new file mode 100644 index 0000000..966dea9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +/** + * MapRunner with [[CRow]] output. + */ +class CRowOutputMapRunner( + name: String, + code: String, + @transient returnType: TypeInformation[CRow]) + extends RichMapFunction[Any, CRow] + with ResultTypeQueryable[CRow] + with Compiler[MapFunction[Any, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Any, Row] = _ + private var outCRow: CRow = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + outCRow = new CRow(null, true) + } + + override def map(in: Any): CRow = { + outCRow.row = function.map(in) + outCRow + } + + override def getProducedType: TypeInformation[CRow] = returnType +}