[FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27bf4cab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27bf4cab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27bf4cab Branch: refs/heads/master Commit: 27bf4cab7141ccbc7e8effe03559c50bbb3f9707 Parents: dc54abc Author: Hequn Cheng <chenghe...@gmail.com> Authored: Tue Apr 18 16:54:09 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat May 6 01:51:55 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 63 +++++- .../table/api/StreamTableEnvironment.scala | 64 +++++- .../flink/table/api/TableEnvironment.scala | 65 +++--- .../flink/table/plan/nodes/CommonCalc.scala | 32 ++- .../table/plan/nodes/CommonCorrelate.scala | 139 +++++-------- .../flink/table/plan/nodes/CommonScan.scala | 7 +- .../table/plan/nodes/dataset/BatchScan.scala | 2 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 16 +- .../plan/nodes/dataset/DataSetCorrelate.scala | 29 ++- .../plan/nodes/dataset/DataSetValues.scala | 2 +- .../plan/nodes/datastream/DataStreamCalc.scala | 26 +-- .../nodes/datastream/DataStreamCorrelate.scala | 40 +++- .../datastream/DataStreamGroupAggregate.scala | 24 ++- .../DataStreamGroupWindowAggregate.scala | 52 +++-- .../datastream/DataStreamOverAggregate.scala | 72 ++++--- .../plan/nodes/datastream/DataStreamRel.scala | 6 +- .../plan/nodes/datastream/DataStreamScan.scala | 7 +- .../plan/nodes/datastream/DataStreamUnion.scala | 4 +- .../nodes/datastream/DataStreamValues.scala | 13 +- .../plan/nodes/datastream/StreamScan.scala | 27 ++- .../datastream/StreamTableSourceScan.scala | 8 +- .../plan/nodes/logical/FlinkLogicalCalc.scala | 2 +- .../datastream/DataStreamRetractionRules.scala | 16 +- .../runtime/CRowCorrelateFlatMapRunner.scala | 83 ++++++++ .../flink/table/runtime/CRowFlatMapRunner.scala | 72 +++++++ .../table/runtime/CRowInputMapRunner.scala | 57 ++++++ .../table/runtime/CRowOutputMapRunner.scala | 60 ++++++ .../table/runtime/CRowWrappingCollector.scala | 41 ++++ .../flink/table/runtime/FlatMapRunner.scala | 17 +- .../aggregate/AggregateAggFunction.scala | 15 +- .../table/runtime/aggregate/AggregateUtil.scala | 48 +++-- ...SetSessionWindowAggReduceGroupFunction.scala | 4 +- ...taSetSlideWindowAggReduceGroupFunction.scala | 4 +- ...TumbleTimeWindowAggReduceGroupFunction.scala | 4 +- .../aggregate/GroupAggProcessFunction.scala | 58 ++++-- ...rementalAggregateAllTimeWindowFunction.scala | 7 +- .../IncrementalAggregateAllWindowFunction.scala | 11 +- ...IncrementalAggregateTimeWindowFunction.scala | 7 +- .../IncrementalAggregateWindowFunction.scala | 13 +- .../aggregate/ProcTimeBoundedRangeOver.scala | 30 +-- .../aggregate/ProcTimeBoundedRowsOver.scala | 26 ++- .../ProcTimeUnboundedNonPartitionedOver.scala | 23 ++- .../ProcTimeUnboundedPartitionedOver.scala | 22 +- .../aggregate/RowTimeBoundedRangeOver.scala | 36 ++-- .../aggregate/RowTimeBoundedRowsOver.scala | 30 +-- .../aggregate/RowTimeUnboundedOver.scala | 50 ++--- .../aggregate/TimeWindowPropertyCollector.scala | 34 ++- .../runtime/io/CRowValuesInputFormat.scala | 59 ++++++ .../table/runtime/io/ValuesInputFormat.scala | 17 +- .../apache/flink/table/runtime/types/CRow.scala | 55 +++++ .../table/runtime/types/CRowComparator.scala | 83 ++++++++ .../table/runtime/types/CRowSerializer.scala | 78 +++++++ .../table/runtime/types/CRowTypeInfo.scala | 98 +++++++++ .../apache/flink/table/sinks/CsvTableSink.scala | 2 + .../flink/table/TableEnvironmentTest.scala | 72 ++++++- .../scala/batch/TableEnvironmentITCase.scala | 20 ++ .../api/scala/stream/RetractionITCase.scala | 205 +++++++++++++++++++ .../api/scala/stream/TableSinkITCase.scala | 2 +- .../api/scala/stream/utils/StreamITCase.scala | 11 +- ...ProcessingOverRangeProcessFunctionTest.scala | 105 +++++----- .../runtime/types/CRowComparatorTest.scala | 61 ++++++ .../table/utils/MockTableEnvironment.scala | 8 + 62 files changed, 1813 insertions(+), 531 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 02c6063..f7955f0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -26,16 +26,20 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.RuleSet +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.configuration.Configuration import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable} +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} import org.apache.flink.types.Row @@ -128,6 +132,56 @@ abstract class BatchTableEnvironment( } /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + override protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] = { + + if (requestedTypeInfo.getTypeClass == classOf[Row]) { + // Row to Row, no conversion needed + None + } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) { + // Row to CRow, only needs to be wrapped + Some( + new RichMapFunction[Row, CRow] { + private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true) + override def map(value: Row): CRow = { + outCRow.row = value + outCRow + } + }.asInstanceOf[MapFunction[IN, OUT]] + ) + } else { + // some type that is neither Row or CRow + + val converterFunction = generateRowConverterFunction[OUT]( + physicalTypeInfo.asInstanceOf[TypeInformation[Row]], + logicalRowType, + requestedTypeInfo, + functionName + ) + + val mapFunction = new MapRunner[IN, OUT]( + converterFunction.name, + converterFunction.code, + converterFunction.returnType) + + Some(mapFunction) + } + } + + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. * @@ -293,10 +347,15 @@ abstract class BatchTableEnvironment( logicalPlan match { case node: DataSetRel => val plan = node.translateToPlan(this) - val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion") + val conversion = + getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion") conversion match { case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary - case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + case Some(mapFunction: MapFunction[Row, A]) => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .asInstanceOf[DataSet[A]] } case _ => http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index dd2c09d..0632a47 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -28,6 +28,7 @@ import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -37,7 +38,9 @@ import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, Rowtim import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamRel import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable} +import org.apache.flink.table.runtime.CRowInputMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TypeCheckUtils @@ -132,6 +135,52 @@ abstract class StreamTableEnvironment( } } + + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + override protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] = { + + if (requestedTypeInfo.getTypeClass == classOf[CRow]) { + // CRow to CRow, no conversion needed + None + } else if (requestedTypeInfo.getTypeClass == classOf[Row]) { + // CRow to Row, only needs to be unwrapped + Some( + new MapFunction[CRow, Row] { + override def map(value: CRow): Row = value.row + }.asInstanceOf[MapFunction[IN, OUT]] + ) + } else { + // Some type that is neither CRow nor Row + + val converterFunction = generateRowConverterFunction[OUT]( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + logicalRowType, + requestedTypeInfo, + functionName + ) + + Some(new CRowInputMapRunner[OUT]( + converterFunction.name, + converterFunction.code, + converterFunction.returnType) + .asInstanceOf[MapFunction[IN, OUT]]) + + } + } + /** * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s * catalog. @@ -377,10 +426,15 @@ abstract class StreamTableEnvironment( logicalPlan match { case node: DataStreamRel => val plan = node.translateToPlan(this) - val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + val conversion = + getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") conversion match { case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary - case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + case Some(mapFunction: MapFunction[CRow, A]) => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .asInstanceOf[DataStream[A]] } case _ => @@ -398,9 +452,9 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataStream = translate[Row]( + val dataStream = translate[CRow]( optimizedPlan, - ast.getRowType)(new GenericTypeInfo(classOf[Row])) + ast.getRowType)(new GenericTypeInfo(classOf[CRow])) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 9ed5000..d27db1e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} -import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction} @@ -59,7 +59,7 @@ import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.RelTable -import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.runtime.types.CRowTypeInfo import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog @@ -644,6 +644,18 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case cr: CRowTypeInfo => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Alias(UnresolvedFieldReference(origName), name, _), _) => + val idx = cr.getFieldIndex(origName) + if (idx < 0) { + throw new TableException(s"$origName is not a field of type $cr") + } + (idx, name) + case _ => throw new TableException( + "Field reference expression or alias on field expression expected.") + } case c: CaseClassTypeInfo[A] => exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => @@ -694,37 +706,38 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Creates a final converter that maps the internal row type to external type. * - * @param physicalRowTypeInfo the input of the sink + * @param physicalTypeInfo the input of the sink * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def sinkConversion[T]( - physicalRowTypeInfo: TypeInformation[Row], + protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, OUT]] + + protected def generateRowConverterFunction[OUT]( + inputTypeInfo: TypeInformation[Row], logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[T], - functionName: String) - : Option[MapFunction[Row, T]] = { + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + GeneratedFunction[MapFunction[Row, OUT], OUT] = { // validate that at least the field types of physical and logical type match // we do that here to make sure that plan translation was correct val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) - if (physicalRowTypeInfo != logicalRowTypeInfo) { + if (logicalRowTypeInfo != inputTypeInfo) { throw TableException("The field types of physical and logical row types do not match." + "This is a bug and should not happen. Please file an issue.") } - // requested type is a generic Row, no conversion needed - if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] && - requestedTypeInfo.getTypeClass == classOf[Row]) { - return None - } - // convert to type information - val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => - FlinkTypeFactory.toTypeInfo(relDataType.getType) - } + val logicalFieldTypes = logicalRowType.getFieldList.asScala + .map(t => FlinkTypeFactory.toTypeInfo(t.getType)) + // field names val logicalFieldNames = logicalRowType.getFieldNames.asScala @@ -732,8 +745,8 @@ abstract class TableEnvironment(val config: TableConfig) { if (requestedTypeInfo.getArity != logicalFieldTypes.length) { throw new TableException("Arity of result does not match requested type.") } - requestedTypeInfo match { + requestedTypeInfo match { // POJO type requested case pt: PojoTypeInfo[_] => logicalFieldNames.zip(logicalFieldTypes) foreach { @@ -780,7 +793,7 @@ abstract class TableEnvironment(val config: TableConfig) { val generator = new CodeGenerator( config, false, - physicalRowTypeInfo, + inputTypeInfo, None, None) @@ -794,20 +807,12 @@ abstract class TableEnvironment(val config: TableConfig) { |return ${conversion.resultTerm}; |""".stripMargin - val genFunction = generator.generateFunction( + generator.generateFunction( functionName, - classOf[MapFunction[Row, T]], + classOf[MapFunction[Row, OUT]], body, requestedTypeInfo) - - val mapFunction = new MapRunner[Row, T]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - Some(mapFunction) } - } /** http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index ff5ffb2..e875587 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -20,26 +20,26 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} import org.apache.calcite.rex._ -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.FlatMapRunner import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait CommonCalc { +trait CommonCalc[T] { - private[flink] def functionBody( + private[flink] def generateFunction( generator: CodeGenerator, + ruleDescription: String, inputSchema: RowSchema, returnSchema: RowSchema, calcProgram: RexProgram, - config: TableConfig) - : String = { + config: TableConfig): + GeneratedFunction[FlatMapFunction[Row, Row], Row] = { val expandedExpressions = calcProgram .getProjectList @@ -61,7 +61,7 @@ trait CommonCalc { expandedExpressions) // only projection - if (condition == null) { + val body = if (condition == null) { s""" |${projection.code} |${generator.collectorTerm}.collect(${projection.resultTerm}); @@ -89,16 +89,12 @@ trait CommonCalc { |""".stripMargin } } - } - - private[flink] def calcMapFunction( - genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row]) - : RichFlatMapFunction[Row, Row] = { - new FlatMapRunner[Row, Row]( - genFunction.name, - genFunction.code, - genFunction.returnType) + generator.generateFunction( + ruleDescription, + classOf[FlatMapFunction[Row, Row]], + body, + returnSchema.physicalTypeInfo) } private[flink] def conditionToString( @@ -168,8 +164,8 @@ trait CommonCalc { // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule // in normalization stage. So we should ignore CASTs here in optimization stage. val compCnt = calcProgram.getExprList.asScala.toList.count { - case i: RexInputRef => false - case l: RexLiteral => false + case _: RexInputRef => false + case _: RexLiteral => false case c: RexCall if c.getOperator.getName.equals("CAST") => false case _ => true } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 02305ee..44a109e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -28,7 +28,7 @@ import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector} +import org.apache.flink.table.runtime.TableFunctionCollector import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -36,72 +36,27 @@ import scala.collection.JavaConverters._ /** * Join a user-defined table function */ -trait CommonCorrelate { - - /** - * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table - * and user-defined table function. - */ - private[flink] def correlateMapFunction( - config: TableConfig, - inputSchema: RowSchema, - udtfTypeInfo: TypeInformation[Any], - returnSchema: RowSchema, - joinType: SemiJoinType, - rexCall: RexCall, - condition: Option[RexNode], - pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping - ruleDescription: String) - : CorrelateFlatMapRunner[Row, Row] = { - - val flatMap = generateFunction( - config, - inputSchema.physicalTypeInfo, - udtfTypeInfo, - returnSchema.physicalTypeInfo, - returnSchema.logicalFieldNames, - joinType, - inputSchema.mapRexNode(rexCall).asInstanceOf[RexCall], - pojoFieldMapping, - ruleDescription) - - val collector = generateCollector( - config, - inputSchema.physicalTypeInfo, - udtfTypeInfo, - returnSchema.physicalTypeInfo, - returnSchema.logicalFieldNames, - condition.map(inputSchema.mapRexNode), - pojoFieldMapping) - - new CorrelateFlatMapRunner[Row, Row]( - flatMap.name, - flatMap.code, - collector.name, - collector.code, - flatMap.returnType) - - } +trait CommonCorrelate[T] { /** * Generates the flat map function to run the user-defined table function. */ - private def generateFunction( - config: TableConfig, - inputTypeInfo: TypeInformation[Row], - udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Row], - resultFieldNames: Seq[String], - joinType: SemiJoinType, - rexCall: RexCall, - pojoFieldMapping: Option[Array[Int]], - ruleDescription: String) - : GeneratedFunction[FlatMapFunction[Row, Row], Row] = { + private[flink] def generateFunction( + config: TableConfig, + inputSchema: RowSchema, + udtfTypeInfo: TypeInformation[Any], + returnSchema: RowSchema, + rowType: RelDataType, + joinType: SemiJoinType, + rexCall: RexCall, + pojoFieldMapping: Option[Array[Int]], + ruleDescription: String): + GeneratedFunction[FlatMapFunction[Row, Row], Row] = { val functionGenerator = new CodeGenerator( config, false, - inputTypeInfo, + inputSchema.physicalTypeInfo, Some(udtfTypeInfo), None, pojoFieldMapping) @@ -115,9 +70,9 @@ trait CommonCorrelate { val call = functionGenerator.generateExpression(rexCall) var body = s""" - |${call.resultTerm}.setCollector($collectorTerm); - |${call.code} - |""".stripMargin + |${call.resultTerm}.setCollector($collectorTerm); + |${call.code} + |""".stripMargin if (joinType == SemiJoinType.LEFT) { // left outer join @@ -132,15 +87,17 @@ trait CommonCorrelate { x.resultType) } val outerResultExpr = functionGenerator.generateResultExpression( - input1AccessExprs ++ input2NullExprs, returnType, resultFieldNames) + input1AccessExprs ++ input2NullExprs, + returnSchema.physicalTypeInfo, + rowType.getFieldNames.asScala) body += s""" - |boolean hasOutput = $collectorTerm.isCollected(); - |if (!hasOutput) { - | ${outerResultExpr.code} - | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm}); - |} - |""".stripMargin + |boolean hasOutput = $collectorTerm.isCollected(); + |if (!hasOutput) { + | ${outerResultExpr.code} + | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm}); + |} + |""".stripMargin } else if (joinType != SemiJoinType.INNER) { throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.") } @@ -149,26 +106,26 @@ trait CommonCorrelate { ruleDescription, classOf[FlatMapFunction[Row, Row]], body, - returnType) + returnSchema.physicalTypeInfo) } /** * Generates table function collector. */ private[flink] def generateCollector( - config: TableConfig, - inputTypeInfo: TypeInformation[Row], - udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Row], - resultFieldNames: Seq[String], - condition: Option[RexNode], - pojoFieldMapping: Option[Array[Int]]) - : GeneratedCollector = { + config: TableConfig, + inputSchema: RowSchema, + udtfTypeInfo: TypeInformation[Any], + returnSchema: RowSchema, + rowType: RelDataType, + condition: Option[RexNode], + pojoFieldMapping: Option[Array[Int]]) + : GeneratedCollector = { val generator = new CodeGenerator( config, false, - inputTypeInfo, + inputSchema.physicalTypeInfo, Some(udtfTypeInfo), None, pojoFieldMapping) @@ -177,26 +134,26 @@ trait CommonCorrelate { val crossResultExpr = generator.generateResultExpression( input1AccessExprs ++ input2AccessExprs, - returnType, - resultFieldNames) + returnSchema.physicalTypeInfo, + rowType.getFieldNames.asScala) val collectorCode = if (condition.isEmpty) { s""" - |${crossResultExpr.code} - |getCollector().collect(${crossResultExpr.resultTerm}); - |""".stripMargin + |${crossResultExpr.code} + |getCollector().collect(${crossResultExpr.resultTerm}); + |""".stripMargin } else { val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping) filterGenerator.input1Term = filterGenerator.input2Term val filterCondition = filterGenerator.generateExpression(condition.get) s""" - |${filterGenerator.reuseInputUnboxingCode()} - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${crossResultExpr.code} - | getCollector().collect(${crossResultExpr.resultTerm}); - |} - |""".stripMargin + |${filterGenerator.reuseInputUnboxingCode()} + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${crossResultExpr.code} + | getCollector().collect(${crossResultExpr.resultTerm}); + |} + |""".stripMargin } generator.generateTableFunctionCollector( http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala index 091a1ea..7ce73ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -27,7 +27,7 @@ import org.apache.flink.types.Row /** * Common class for batch and stream scans. */ -trait CommonScan { +trait CommonScan[T] { /** * We check if the input type is exactly the same as the internal row type. @@ -35,11 +35,8 @@ trait CommonScan { */ private[flink] def needsConversion( externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[Row]) - : Boolean = { - + internalTypeInfo: TypeInformation[T]): Boolean = externalTypeInfo != internalTypeInfo - } private[flink] def generatedConversionFunction[F <: Function]( config: TableConfig, http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index cc5d9fb..95707b8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -30,7 +30,7 @@ import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -trait BatchScan extends CommonScan with DataSetRel { +trait BatchScan extends CommonScan[Row] with DataSetRel { protected def convertToInternalRow( input: DataSet[Any], http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 5274fa1..e340a8c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -24,7 +24,6 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex._ -import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.BatchTableEnvironment @@ -47,7 +46,7 @@ class DataSetCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc + with CommonCalc[Row] with DataSetRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -88,22 +87,17 @@ class DataSetCalc( val generator = new CodeGenerator(config, false, inputDS.getType) - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - val body = functionBody( + val genFunction = generateFunction( generator, + ruleDescription, new RowSchema(getInput.getRowType), new RowSchema(getRowType), calcProgram, config) - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Row, Row]], - body, - rowTypeInfo) - - val runner = calcMapFunction(genFunction) + val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType) inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString)) } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala index 6c79b45..49ead26 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala @@ -32,6 +32,7 @@ import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CorrelateFlatMapRunner import org.apache.flink.types.Row /** @@ -48,7 +49,7 @@ class DataSetCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, inputNode) - with CommonCorrelate + with CommonCorrelate[Row] with DataSetRel { override def deriveRowType() = relRowType @@ -98,22 +99,38 @@ class DataSetCorrelate( val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - val pojoFieldMapping = sqlFunction.getPojoFieldMapping + val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - val mapFunc = correlateMapFunction( + val flatMap = generateFunction( config, new RowSchema(getInput.getRowType), udtfTypeInfo, new RowSchema(getRowType), + rowType, joinType, rexCall, - condition, - Some(pojoFieldMapping), + pojoFieldMapping, ruleDescription) + val collector = generateCollector( + config, + new RowSchema(getInput.getRowType), + udtfTypeInfo, + new RowSchema(getRowType), + rowType, + condition, + pojoFieldMapping) + + val mapFunc = new CorrelateFlatMapRunner[Row, Row]( + flatMap.name, + flatMap.code, + collector.name, + collector.code, + flatMap.returnType) + inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala index 3ebee2c..948dd27 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala @@ -88,7 +88,7 @@ class DataSetValues( generatedRecords.map(_.code), returnType) - val inputFormat = new ValuesInputFormat[Row]( + val inputFormat = new ValuesInputFormat( generatedFunction.name, generatedFunction.code, generatedFunction.returnType) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index c6c25c0..59f723ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -24,13 +24,13 @@ import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexProgram -import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc -import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.CRowFlatMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * Flink RelNode which matches along with FlatMapOperator. @@ -45,7 +45,7 @@ class DataStreamCalc( calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) - with CommonCalc + with CommonCalc[CRow] with DataStreamRel { override def deriveRowType(): RelDataType = schema.logicalType @@ -83,28 +83,28 @@ class DataStreamCalc( estimateRowCount(calcProgram, rowCnt) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType - val generator = new CodeGenerator(config, false, inputDataStream.getType) + val generator = new CodeGenerator(config, false, inputRowType) - val body = functionBody( + val genFunction = generateFunction( generator, + ruleDescription, inputSchema, schema, calcProgram, config) - val genFunction = generator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Row, Row]], - body, - schema.physicalTypeInfo) + val mapFunc = new CRowFlatMapRunner( + genFunction.name, + genFunction.code, + CRowTypeInfo(schema.physicalTypeInfo)) - val mapFunc = calcMapFunction(genFunction) inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index 899d8ef..19ad89b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -28,7 +28,8 @@ import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.nodes.CommonCorrelate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * Flink RelNode which matches along with join a user defined table function. @@ -45,7 +46,7 @@ class DataStreamCorrelate( joinType: SemiJoinType, ruleDescription: String) extends SingleRel(cluster, traitSet, input) - with CommonCorrelate + with CommonCorrelate[CRow] with DataStreamRel { override def deriveRowType() = schema.logicalType @@ -81,31 +82,54 @@ class DataStreamCorrelate( .itemIf("condition", condition.orNull, condition.isDefined) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig // we do not need to specify input type val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo] val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] val rexCall = funcRel.getCall.asInstanceOf[RexCall] val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - val pojoFieldMapping = sqlFunction.getPojoFieldMapping + val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping) val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] - val mapFunc = correlateMapFunction( + val flatMap = generateFunction( config, inputSchema, udtfTypeInfo, schema, + getRowType, joinType, rexCall, - condition, - Some(pojoFieldMapping), + pojoFieldMapping, ruleDescription) - inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, schema.logicalType)) + val collector = generateCollector( + config, + inputSchema, + udtfTypeInfo, + schema, + getRowType, + condition, + pojoFieldMapping) + + val mapFunc = new CRowCorrelateFlatMapRunner( + flatMap.name, + flatMap.code, + collector.name, + collector.code, + CRowTypeInfo(flatMap.returnType)) + + val inputParallelism = inputDS.getParallelism + + inputDS + .flatMap(mapFunc) + // preserve input parallelism to ensure that acc and retract messages remain in order + .setParallelism(inputParallelism) + .name(correlateOpName(rexCall, sqlFunction, schema.logicalType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 3555c80..056cda9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -28,8 +28,9 @@ import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.types.Row +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * @@ -97,15 +98,18 @@ class DataStreamGroupAggregate( inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( inputSchema.mapAggregateCall(namedAggregate.left), namedAggregate.right) } + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) + val generator = new CodeGenerator( tableEnv.getConfig, false, @@ -129,28 +133,30 @@ class DataStreamGroupAggregate( physicalNamedAggregates, inputSchema.logicalType, inputSchema.physicalFieldTypeInfo, - groupings) + groupings, + DataStreamRetractionRules.isAccRetract(this), + DataStreamRetractionRules.isAccRetract(getInput)) - val result: DataStream[Row] = + val result: DataStream[CRow] = // grouped / keyed aggregation if (physicalGrouping.nonEmpty) { inputDS .keyBy(groupings: _*) .process(processFunction) - .returns(schema.physicalTypeInfo) + .returns(outRowType) .name(keyedAggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } // global / non-keyed aggregation else { inputDS - .keyBy(new NullByteKeySelector[Row]) + .keyBy(new NullByteKeySelector[CRow]) .process(processFunction) .setParallelism(1) .setMaxParallelism(1) - .returns(schema.physicalTypeInfo) + .returns(outRowType) .name(nonKeyedAggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } result } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index ea4b0bf..f61828b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ @@ -34,10 +34,12 @@ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._ +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.types.Row +import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -101,14 +103,25 @@ class DataStreamGroupWindowAggregate( namedProperties)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val physicalNamedAggregates = namedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( inputSchema.mapAggregateCall(namedAggregate.left), namedAggregate.right) } + val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + + if (consumeRetraction) { + throw new TableException( + "Retraction on windowed GroupBy aggregation is not supported yet. " + + "Note: Windowed GroupBy aggregation should not follow a " + + "non-windowed GroupBy aggregation.") + } + + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) val aggString = aggregationToString( inputSchema.logicalType, @@ -145,7 +158,7 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = createKeyedWindowedStream(window, keyedStream) - .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]] + .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( @@ -154,15 +167,11 @@ class DataStreamGroupWindowAggregate( inputSchema.physicalType, inputSchema.physicalFieldTypeInfo, schema.physicalType, + physicalGrouping, needMerge) windowedStream - .aggregate( - aggFunction, - windowFunction, - accumulatorRowType, - aggResultRowType, - schema.physicalTypeInfo) + .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType) .name(keyedAggOpName) } // global / non-keyed aggregation @@ -174,7 +183,7 @@ class DataStreamGroupWindowAggregate( val windowedStream = createNonKeyedWindowedStream(window, inputDS) - .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] + .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( @@ -183,15 +192,11 @@ class DataStreamGroupWindowAggregate( inputSchema.physicalType, inputSchema.physicalFieldTypeInfo, schema.physicalType, + Array[Int](), needMerge) windowedStream - .aggregate( - aggFunction, - windowFunction, - accumulatorRowType, - aggResultRowType, - schema.physicalTypeInfo) + .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType) .name(nonKeyedAggOpName) } } @@ -199,9 +204,10 @@ class DataStreamGroupWindowAggregate( object DataStreamGroupWindowAggregate { - - private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple]) - : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match { + private def createKeyedWindowedStream( + groupWindow: LogicalWindow, + stream: KeyedStream[CRow, Tuple]): + WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=> @@ -250,8 +256,10 @@ object DataStreamGroupWindowAggregate { stream.window(EventTimeSessionWindows.withGap(toTime(gap))) } - private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row]) - : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match { + private def createNonKeyedWindowedStream( + groupWindow: LogicalWindow, + stream: DataStream[CRow]): + AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size) => http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index fb912c4..e823cd6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -20,21 +20,22 @@ package org.apache.flink.table.plan.nodes.datastream import java.util.{List => JList} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.OverAggregate import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.aggregate._ -import org.apache.flink.types.Row +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} class DataStreamOverAggregate( logicWindow: Window, @@ -87,7 +88,7 @@ class DataStreamOverAggregate( namedAggregates)) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { if (logicWindow.groups.size > 1) { throw new TableException( "Unsupported use of OVER windows. All aggregates must be computed on the same window.") @@ -110,6 +111,8 @@ class DataStreamOverAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val generator = new CodeGenerator( tableEnv.getConfig, false, @@ -120,6 +123,12 @@ class DataStreamOverAggregate( .get(orderKey.getFieldIndex) .getType + if (consumeRetraction) { + throw new TableException( + "Retraction on Over window aggregation is not supported yet. " + + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") + } + timeType match { case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => // proc-time OVER window @@ -138,8 +147,7 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else { throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") @@ -154,16 +162,14 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = true, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { // bounded OVER window createBoundedAndCurrentRowOverWindow( generator, inputDS, isRowTimeType = true, - isRowsClause = overWindow.isRows - ) + isRowsClause = overWindow.isRows) } else { throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") @@ -177,12 +183,14 @@ class DataStreamOverAggregate( def createUnboundedAndCurrentRowOverWindow( generator: CodeGenerator, - inputDS: DataStream[Row], + inputDS: DataStream[CRow], isRowTimeType: Boolean, - isRowsClause: Boolean): DataStream[Row] = { + isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) + val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map { namedAggregate => new CalcitePair[AggregateCall, String]( @@ -190,6 +198,9 @@ class DataStreamOverAggregate( namedAggregate.right) } + // get the output types + val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val processFunction = AggregateUtil.createUnboundedOverProcessFunction( generator, namedAggregates, @@ -200,30 +211,28 @@ class DataStreamOverAggregate( partitionKeys.nonEmpty, isRowsClause) - val result: DataStream[Row] = + val result: DataStream[CRow] = // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(schema.physicalTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] + .asInstanceOf[DataStream[CRow]] } // non-partitioned aggregation else { if (isRowTimeType) { - inputDS.keyBy(new NullByteKeySelector[Row]) + inputDS.keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(schema.physicalTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } else { inputDS .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(schema.physicalTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } } result @@ -231,9 +240,9 @@ class DataStreamOverAggregate( def createBoundedAndCurrentRowOverWindow( generator: CodeGenerator, - inputDS: DataStream[Row], + inputDS: DataStream[CRow], isRowTimeType: Boolean, - isRowsClause: Boolean): DataStream[Row] = { + isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) @@ -245,7 +254,10 @@ class DataStreamOverAggregate( } val precedingOffset = - getLowerBoundary(logicWindow, overWindow, input) + (if (isRowsClause) 1 else 0) + getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0) + + // get the output types + val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) val processFunction = AggregateUtil.createBoundedOverProcessFunction( generator, @@ -257,24 +269,22 @@ class DataStreamOverAggregate( isRowsClause, isRowTimeType ) - val result: DataStream[Row] = + val result: DataStream[CRow] = // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(schema.physicalTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } // non-partitioned aggregation else { inputDS - .keyBy(new NullByteKeySelector[Row]) + .keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(schema.physicalTypeInfo) + .returns(returnTypeInfo) .name(aggOpName) - .asInstanceOf[DataStream[Row]] } result } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index 128da81..9754de4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.nodes.FlinkRelNode -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.types.CRow trait DataStreamRel extends FlinkRelNode { @@ -29,9 +29,9 @@ trait DataStreamRel extends FlinkRelNode { * Translates the FlinkRelNode into a Flink operator. * * @param tableEnv The [[StreamTableEnvironment]] of the translated Table. - * @return DataStream of type [[Row]] + * @return DataStream of type [[CRow]] */ - def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] + def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow] /** * Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index 05f60ba..c613646 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -24,8 +24,9 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment -import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema} -import org.apache.flink.types.Row +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.runtime.types.CRow /** * Flink RelNode which matches along with DataStreamSource. @@ -53,7 +54,7 @@ class DataStreamScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream convertToInternalRow(schema, inputDataStream, dataStreamTable, config) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index 47b4946..654c259 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.types.CRow /** * Flink RelNode which matches along with Union. @@ -58,7 +58,7 @@ class DataStreamUnion( s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))" } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index c964e03..32c9aaf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -27,8 +27,8 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.schema.RowSchema -import org.apache.flink.table.runtime.io.ValuesInputFormat -import org.apache.flink.types.Row +import org.apache.flink.table.runtime.io.CRowValuesInputFormat +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import scala.collection.JavaConverters._ @@ -56,10 +56,11 @@ class DataStreamValues( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig + val returnType = CRowTypeInfo(schema.physicalTypeInfo) val generator = new CodeGenerator(config) // generate code for every record @@ -76,12 +77,12 @@ class DataStreamValues( generatedRecords.map(_.code), schema.physicalTypeInfo) - val inputFormat = new ValuesInputFormat[Row]( + val inputFormat = new CRowValuesInputFormat( generatedFunction.name, generatedFunction.code, - generatedFunction.returnType) + returnType) - tableEnv.execEnv.createInput(inputFormat, schema.physicalTypeInfo) + tableEnv.execEnv.createInput(inputFormat, returnType) } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index dd82819..25e72fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -22,46 +22,51 @@ import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.TableConfig import org.apache.flink.table.plan.nodes.CommonScan -import org.apache.flink.table.plan.schema.{FlinkTable, RowSchema} -import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row +import org.apache.flink.table.plan.schema.FlinkTable +import org.apache.flink.table.runtime.CRowOutputMapRunner +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import scala.collection.JavaConverters._ -trait StreamScan extends CommonScan with DataStreamRel { +trait StreamScan extends CommonScan[CRow] with DataStreamRel { protected def convertToInternalRow( schema: RowSchema, input: DataStream[Any], flinkTable: FlinkTable[_], config: TableConfig) - : DataStream[Row] = { + : DataStream[CRow] = { + + val inputType = input.getType + val internalType = CRowTypeInfo(schema.physicalTypeInfo) // conversion - if (needsConversion(input.getType, schema.physicalTypeInfo)) { + if (needsConversion(input.getType, internalType)) { val function = generatedConversionFunction( config, classOf[MapFunction[Any, Row]], - input.getType, + inputType, schema.physicalTypeInfo, "DataStreamSourceConversion", schema.physicalFieldNames, Some(flinkTable.fieldIndexes)) - val runner = new MapRunner[Any, Row]( + val mapFunc = new CRowOutputMapRunner( function.name, function.code, - function.returnType) + internalType) - val opName = s"from: (${schema.logicalFieldNames.mkString(", ")})" + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" // TODO we need a ProcessFunction here - input.map(runner).name(opName) + input.map(mapFunc).name(opName).returns(internalType) } // no conversion necessary, forward else { - input.asInstanceOf[DataStream[Row]] + input.asInstanceOf[DataStream[CRow]] } } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index e34e416..b2d7019 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -25,9 +25,11 @@ import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan -import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable} +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.sources._ -import org.apache.flink.types.Row +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.sources.{StreamTableSource, TableSource} /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -96,7 +98,7 @@ class StreamTableSourceScan( ) } - override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] convertToInternalRow( http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala index ec90392..0ca079e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala @@ -34,7 +34,7 @@ class FlinkLogicalCalc( calcProgram: RexProgram) extends Calc(cluster, traitSet, input, calcProgram) with FlinkLogicalRel - with CommonCalc { + with CommonCalc[Any] { override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { new FlinkLogicalCalc(cluster, traitSet, child, program) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala index aeb67b6..bd9a7ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala @@ -82,6 +82,14 @@ object DataStreamRetractionRules { } /** + * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. + */ + def isAccRetract(node: RelNode): Boolean = { + val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) + null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract + } + + /** * Rule that assigns the default retraction information to [[DataStreamRel]] nodes. * The default is to not publish updates as retraction messages and [[AccMode.Acc]]. */ @@ -190,14 +198,6 @@ object DataStreamRetractionRules { } /** - * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. - */ - def isAccRetract(node: RelNode): Boolean = { - val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) - null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract - } - - /** * Set [[AccMode.AccRetract]] to a [[RelNode]]. */ def setAccRetract(relNode: RelNode): RelNode = { http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala new file mode 100644 index 0000000..66e51b1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.{Logger, LoggerFactory} + +/** + * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowCorrelateFlatMapRunner( + flatMapName: String, + flatMapCode: String, + collectorName: String, + collectorCode: String, + @transient returnType: TypeInformation[CRow]) + extends RichFlatMapFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[Any] { + + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var function: FlatMapFunction[Row, Row] = _ + private var collector: TableFunctionCollector[_] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) + LOG.debug("Instantiating TableFunctionCollector.") + collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] + this.cRowWrapper = new CRowWrappingCollector() + + LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode") + val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode) + val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]]) + LOG.debug("Instantiating FlatMapFunction.") + function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]] + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + } + + override def flatMap(in: CRow, out: Collector[CRow]): Unit = { + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + + collector.setCollector(cRowWrapper) + collector.setInput(in.row) + collector.reset() + + function.flatMap(in.row, cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala new file mode 100644 index 0000000..9a4650b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +/** + * FlatMapRunner with [[CRow]] input and [[CRow]] output. + */ +class CRowFlatMapRunner( + name: String, + code: String, + @transient returnType: TypeInformation[CRow]) + extends RichFlatMapFunction[CRow, CRow] + with ResultTypeQueryable[CRow] + with Compiler[FlatMapFunction[Row, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: FlatMapFunction[Row, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating FlatMapFunction.") + function = clazz.newInstance() + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + + this.cRowWrapper = new CRowWrappingCollector() + } + + override def flatMap(in: CRow, out: Collector[CRow]): Unit = { + cRowWrapper.out = out + cRowWrapper.setChange(in.change) + function.flatMap(in.row, cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType + + override def close(): Unit = { + FunctionUtils.closeFunction(function) + } +} + +