[FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37988c1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37988c1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37988c1 Branch: refs/heads/master Commit: f37988c19adc30d324cde83c54f2fa5d36efb9e7 Parents: bfed279 Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Apr 28 01:59:57 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat May 6 01:51:55 2017 +0200 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTableSink.java | 6 +- .../flink/table/api/BatchTableEnvironment.scala | 16 +- .../table/api/StreamTableEnvironment.scala | 428 +++++++++++----- .../apache/flink/table/api/TableConfig.scala | 17 - .../flink/table/api/TableEnvironment.scala | 48 +- .../table/api/java/StreamTableEnvironment.scala | 81 ++- .../api/scala/StreamTableEnvironment.scala | 33 +- .../table/api/scala/TableConversions.scala | 19 + .../org/apache/flink/table/api/table.scala | 4 +- .../plan/nodes/datastream/DataStreamCalc.scala | 8 +- .../datastream/DataStreamGroupAggregate.scala | 2 + .../DataStreamGroupWindowAggregate.scala | 4 + .../nodes/datastream/retractionTraits.scala | 37 +- .../datastream/DataStreamRetractionRules.scala | 16 +- .../runtime/CRowCorrelateFlatMapRunner.scala | 2 +- .../flink/table/runtime/CRowFlatMapRunner.scala | 2 +- .../table/runtime/CRowInputMapRunner.scala | 2 +- .../runtime/CRowInputTupleOutputMapRunner.scala | 53 +- .../table/runtime/CRowOutputMapRunner.scala | 2 +- .../table/runtime/CorrelateFlatMapRunner.scala | 2 +- .../flink/table/runtime/FlatJoinRunner.scala | 2 +- .../flink/table/runtime/FlatMapRunner.scala | 2 +- .../flink/table/runtime/MapJoinLeftRunner.scala | 2 +- .../table/runtime/MapJoinRightRunner.scala | 2 +- .../apache/flink/table/runtime/MapRunner.scala | 2 +- .../flink/table/runtime/MapSideJoinRunner.scala | 2 +- ...aSetSessionWindowAggregatePreProcessor.scala | 2 +- .../aggregate/GroupAggProcessFunction.scala | 55 +- .../runtime/io/CRowValuesInputFormat.scala | 2 +- .../table/runtime/io/ValuesInputFormat.scala | 2 +- .../table/sinks/AppendStreamTableSink.scala | 36 ++ .../apache/flink/table/sinks/CsvTableSink.scala | 100 +--- .../table/sinks/RetractStreamTableSink.scala | 55 ++ .../flink/table/sinks/StreamRetractSink.scala | 35 -- .../flink/table/sinks/StreamTableSink.scala | 32 -- .../table/sinks/UpsertStreamTableSink.scala | 79 +++ .../flink/table/TableEnvironmentTest.scala | 1 - .../api/scala/stream/RetractionITCase.scala | 80 ++- .../api/scala/stream/TableSinkITCase.scala | 33 +- .../table/api/scala/stream/sql/SqlITCase.scala | 102 ++-- .../stream/table/GroupAggregationsITCase.scala | 61 +-- .../api/scala/stream/table/OverWindowTest.scala | 18 +- .../api/scala/stream/utils/StreamITCase.scala | 35 +- .../table/plan/rules/RetractionRulesTest.scala | 2 +- .../table/sinks/StreamTableSinksITCase.scala | 511 +++++++++++++++++++ .../table/utils/MockTableEnvironment.scala | 5 - .../flink/table/utils/TableTestBase.scala | 4 +- 47 files changed, 1433 insertions(+), 611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 97f5fba..a8a2fd0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -29,12 +29,12 @@ import org.apache.flink.util.Preconditions; import java.util.Properties; /** - * A version-agnostic Kafka {@link StreamTableSink}. + * A version-agnostic Kafka {@link AppendStreamTableSink}. * * <p>The version-specific Kafka consumers need to extend this class and * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. */ -public abstract class KafkaTableSink implements StreamTableSink<Row> { +public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { protected final String topic; protected final Properties properties; http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index c7bacfe..2a3cedf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAtt import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable} +import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable} import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} @@ -133,14 +133,14 @@ abstract class BatchTableEnvironment( * Creates a final converter that maps the internal row type to external type. * * @param physicalTypeInfo the input of the sink - * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param schema the input schema with correct field names (esp. for POJO field mapping) * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - override protected def getConversionMapper[IN, OUT]( + protected def getConversionMapper[IN, OUT]( physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, + schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], functionName: String): Option[MapFunction[IN, OUT]] = { @@ -153,7 +153,7 @@ abstract class BatchTableEnvironment( val converterFunction = generateRowConverterFunction[OUT]( physicalTypeInfo.asInstanceOf[TypeInformation[Row]], - logicalRowType, + schema, requestedTypeInfo, functionName ) @@ -334,7 +334,11 @@ abstract class BatchTableEnvironment( case node: DataSetRel => val plan = node.translateToPlan(this) val conversion = - getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion") + getConversionMapper( + plan.getType, + new RowSchema(logicalType), + tpe, + "DataSetSinkConversion") conversion match { case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary case Some(mapFunction: MapFunction[Row, A]) => http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index bd06305..aef2b1b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -19,19 +19,23 @@ package org.apache.flink.table.api import _root_.java.util.concurrent.atomic.AtomicInteger +import _root_.java.lang.{Boolean => JBool} import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.plan.hep.HepMatchOrder -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.AtomicType import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.RelTimeIndicatorConverter @@ -39,12 +43,12 @@ import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} +import org.apache.flink.table.plan.nodes.datastream._ import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.StreamTableSourceTable -import org.apache.flink.table.plan.schema.DataStreamTable -import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner} +import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable} +import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TypeCheckUtils import org.apache.flink.types.Row @@ -127,97 +131,192 @@ abstract class StreamTableEnvironment( override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { - case streamSink: StreamTableSink[T] => + + case retractSink: RetractStreamTableSink[_] => + // retraction sink can always be used val outputType = sink.getOutputType // translate the Table into a DataStream and provide the type that the TableSink expects. - val result: DataStream[T] = translate(table)(outputType) - // Give the DataSet to the TableSink to emit it. - streamSink.emitDataStream(result) - - case streamRetractSink: StreamRetractSink[T] => + val result: DataStream[T] = + translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType) + // Give the DataStream to the TableSink to emit it. + retractSink.asInstanceOf[RetractStreamTableSink[Any]] + .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + + case upsertSink: UpsertStreamTableSink[_] => + // optimize plan + val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false) + // check for append only table + val isAppendOnlyTable = isAppendOnly(optimizedPlan) + upsertSink.setIsAppendOnly(isAppendOnlyTable) + // extract unique key fields + val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan) + // check that we have keys if the table has changes (is not append-only) + tableKeys match { + case Some(keys) => upsertSink.setKeyFields(keys) + case None if isAppendOnlyTable => upsertSink.setKeyFields(null) + case None if !isAppendOnlyTable => throw new TableException( + "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + } + val outputType = sink.getOutputType + // translate the Table into a DataStream and provide the type that the TableSink expects. + val result: DataStream[T] = + translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType) + // Give the DataStream to the TableSink to emit it. + upsertSink.asInstanceOf[UpsertStreamTableSink[Any]] + .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + + case appendSink: AppendStreamTableSink[_] => + // optimize plan + val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false) + // verify table is an insert-only (append-only) table + if (!isAppendOnly(optimizedPlan)) { + throw new TableException( + "AppendStreamTableSink requires that Table has only insert changes.") + } val outputType = sink.getOutputType - this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction) // translate the Table into a DataStream and provide the type that the TableSink expects. - val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType) - // Give the DataSet to the TableSink to emit it. - streamRetractSink.emitDataStreamWithChange(result) + val result: DataStream[T] = + translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType) + // Give the DataStream to the TableSink to emit it. + appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result) + case _ => - throw new TableException("StreamTableSink required to emit streaming Table") + throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " + + "RetractStreamTableSink, or UpsertStreamTableSink.") } } - /** * Creates a final converter that maps the internal row type to external type. * * @param physicalTypeInfo the input of the sink - * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param schema the input schema with correct field names (esp. for POJO field mapping) * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - override protected def getConversionMapper[IN, OUT]( + protected def getConversionMapper[IN, OUT]( physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, + schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], functionName: String): - Option[MapFunction[IN, OUT]] = { + MapFunction[IN, OUT] = { - if (requestedTypeInfo.getTypeClass == classOf[CRow]) { - // only used to explain table - None - } else if (requestedTypeInfo.getTypeClass == classOf[Row]) { + if (requestedTypeInfo.getTypeClass == classOf[Row]) { // CRow to Row, only needs to be unwrapped - Some( - new MapFunction[CRow, Row] { - override def map(value: CRow): Row = value.row - }.asInstanceOf[MapFunction[IN, OUT]] - ) + new MapFunction[CRow, Row] { + override def map(value: CRow): Row = value.row + }.asInstanceOf[MapFunction[IN, OUT]] } else { // Some type that is neither CRow nor Row val converterFunction = generateRowConverterFunction[OUT]( physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - logicalRowType, + schema, requestedTypeInfo, functionName ) - Some(new CRowInputMapRunner[OUT]( + new CRowInputMapRunner[OUT]( converterFunction.name, converterFunction.code, converterFunction.returnType) - .asInstanceOf[MapFunction[IN, OUT]]) + .asInstanceOf[MapFunction[IN, OUT]] } } + /** Validates that the plan produces only append changes. */ + protected def isAppendOnly(plan: RelNode): Boolean = { + val appendOnlyValidator = new AppendOnlyValidator + appendOnlyValidator.go(plan) + + appendOnlyValidator.isAppendOnly + } + + /** Extracts the unique keys of the table produced by the plan. */ + protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = { + val keyExtractor = new UniqueKeyExtractor + keyExtractor.go(plan) + keyExtractor.keys + } + /** - * Creates a final converter that maps the internal CRow type to external Tuple2 type. + * Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag. * * @param physicalTypeInfo the input of the sink - * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) - * @param requestedTypeInfo the output type of the sink + * @param schema the input schema with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink. * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def getTupleConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[OUT], - functionName: String): - Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = { + private def getConversionMapperWithChanges[OUT]( + physicalTypeInfo: TypeInformation[CRow], + schema: RowSchema, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + MapFunction[CRow, OUT] = { + + requestedTypeInfo match { + + // Scala tuple + case t: CaseClassTypeInfo[_] + if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + + val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] + if (reqType.getTypeClass == classOf[Row]) { + // Requested type is Row. Just rewrap CRow in Tuple2 + new MapFunction[CRow, (Boolean, Row)] { + override def map(cRow: CRow): (Boolean, Row) = { + (cRow.change, cRow.row) + } + }.asInstanceOf[MapFunction[CRow, OUT]] + } else { + // Use a map function to convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + schema, + reqType, + functionName + ) + + new CRowInputScalaTupleOutputMapRunner( + converterFunction.name, + converterFunction.code, + requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]]) + .asInstanceOf[MapFunction[CRow, OUT]] - val converterFunction = generateRowConverterFunction( - physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - logicalRowType, - requestedTypeInfo, - functionName - ) + } - Some(new CRowInputTupleOutputMapRunner[OUT]( - converterFunction.name, - converterFunction.code, - new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo)) - .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]]) + // Java tuple + case t: TupleTypeInfo[_] + if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + + val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] + if (reqType.getTypeClass == classOf[Row]) { + // Requested type is Row. Just rewrap CRow in Tuple2 + new MapFunction[CRow, JTuple2[JBool, Row]] { + val outT = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row]) + override def map(cRow: CRow): JTuple2[JBool, Row] = { + outT.f0 = cRow.change + outT.f1 = cRow.row + outT + } + }.asInstanceOf[MapFunction[CRow, OUT]] + } else { + // Use a map function to convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + schema, + reqType, + functionName + ) + + new CRowInputJavaTupleOutputMapRunner( + converterFunction.name, + converterFunction.code, + requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]]) + .asInstanceOf[MapFunction[CRow, OUT]] + } + } } /** @@ -380,9 +479,10 @@ abstract class StreamTableEnvironment( * Generates the optimized [[RelNode]] tree from the original relational node tree. * * @param relNode The root node of the relational expression tree. + * @param updatesAsRetraction True if the sink requests updates as retraction messages. * @return The optimized [[RelNode]] tree */ - private[flink] def optimize(relNode: RelNode): RelNode = { + private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { // 1. decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) @@ -410,7 +510,7 @@ abstract class StreamTableEnvironment( // 5. optimize the physical Flink plan val physicalOptRuleSet = getPhysicalOptRuleSet val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() - var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { + val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) } else { logicalPlan @@ -419,13 +519,18 @@ abstract class StreamTableEnvironment( // 6. decorate the optimized plan val decoRuleSet = getDecoRuleSet val decoratedPlan = if (decoRuleSet.iterator().hasNext) { - - if (this.config.getNeedsUpdatesAsRetractionForSink) { - physicalPlan = physicalPlan.copy( + val planToDecorate = if (updatesAsRetraction) { + physicalPlan.copy( physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)), physicalPlan.getInputs) + } else { + physicalPlan } - runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet) + runHepPlanner( + HepMatchOrder.BOTTOM_UP, + decoRuleSet, + planToDecorate, + planToDecorate.getTraitSet) } else { physicalPlan } @@ -440,14 +545,17 @@ abstract class StreamTableEnvironment( * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. * * @param table The root node of the relational expression tree. + * @param updatesAsRetraction Set to true to encode updates as retraction messages. + * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { + protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean) + (implicit tpe: TypeInformation[A]): DataStream[A] = { val relNode = table.getRelNode - val dataStreamPlan = optimize(relNode) - translate(dataStreamPlan, relNode.getRowType) + val dataStreamPlan = optimize(relNode, updatesAsRetraction) + translate(dataStreamPlan, relNode.getRowType, withChangeFlag) } /** @@ -456,87 +564,65 @@ abstract class StreamTableEnvironment( * @param logicalPlan The root node of the relational expression tree. * @param logicalType The row type of the result. Since the logicalPlan can lose the * field naming during optimization we pass the row type separately. + * @param withChangeFlag Set to true to emit records with change flags. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ protected def translate[A]( logicalPlan: RelNode, - logicalType: RelDataType) + logicalType: RelDataType, + withChangeFlag: Boolean) (implicit tpe: TypeInformation[A]): DataStream[A] = { - TableEnvironment.validateType(tpe) + // if no change flags are requested, verify table is an insert-only (append-only) table. + if (!withChangeFlag && !isAppendOnly(logicalPlan)) { + throw new TableException( + "Table is not an append-only table. " + + "Output needs to handle update and delete changes.") + } - logicalPlan match { - case node: DataStreamRel => - val plan = node.translateToPlan(this) - val conversion = - getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") - conversion match { - case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary - case Some(mapFunction: MapFunction[CRow, A]) => - plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .asInstanceOf[DataStream[A]] - } + // get CRow plan + val plan: DataStream[CRow] = translateToCRow(logicalPlan) - case _ => - throw TableException("Cannot generate DataStream due to an invalid logical plan. " + - "This is a bug and should not happen. Please file an issue.") + // convert CRow to output type + val conversion = if (withChangeFlag) { + getConversionMapperWithChanges( + plan.getType, + new RowSchema(logicalType), + tpe, + "DataStreamSinkConversion") + } else { + getConversionMapper( + plan.getType, + new RowSchema(logicalType), + tpe, + "DataStreamSinkConversion") } - } - /** - * Translates a [[Table]] into a [[DataStream]] with change information. - * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. - * - * @param table The root node of the relational expression tree. - * @param wrapToTuple True, if want to output chang information - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. - */ - protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]) - : DataStream[JTuple2[Boolean, A]] = { - val relNode = table.getRelNode - val dataStreamPlan = optimize(relNode) - translate(dataStreamPlan, relNode.getRowType, wrapToTuple) + val rootParallelism = plan.getParallelism + + conversion match { + case mapFunction: MapFunction[CRow, A] => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .setParallelism(rootParallelism) + } } /** - * Translates a logical [[RelNode]] into a [[DataStream]] with change information. + * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]]. * - * @param logicalPlan The root node of the relational expression tree. - * @param logicalType The row type of the result. Since the logicalPlan can lose the - * @param wrapToTuple True, if want to output chang information - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. + * @param logicalPlan The logical plan to translate. + * @return The [[DataStream]] of type [[CRow]]. */ - protected def translate[A]( - logicalPlan: RelNode, - logicalType: RelDataType, - wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = { - - TableEnvironment.validateType(tpe) + protected def translateToCRow( + logicalPlan: RelNode): DataStream[CRow] = { logicalPlan match { case node: DataStreamRel => - val plan = node.translateToPlan(this) - val conversion = - getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") - conversion match { - case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary - case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) => - plan.map(mapFunction) - .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe)) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .asInstanceOf[DataStream[JTuple2[Boolean, A]]] - } - + node.translateToPlan(this) case _ => throw TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") @@ -551,10 +637,8 @@ abstract class StreamTableEnvironment( */ def explain(table: Table): String = { val ast = table.getRelNode - val optimizedPlan = optimize(ast) - val dataStream = translate[CRow]( - optimizedPlan, - ast.getRowType)(new GenericTypeInfo(classOf[CRow])) + val optimizedPlan = optimize(ast, updatesAsRetraction = false) + val dataStream = translateToCRow(optimizedPlan) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan @@ -574,4 +658,90 @@ abstract class StreamTableEnvironment( s"$sqlPlan" } + private class AppendOnlyValidator extends RelVisitor { + + var isAppendOnly = true + + override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { + case s: DataStreamRel if s.producesUpdates => + isAppendOnly = false + case _ => + super.visit(node, ordinal, parent) + } + } + } + + /** Identifies unique key fields in the output of a RelNode. */ + private class UniqueKeyExtractor extends RelVisitor { + + var keys: Option[Array[String]] = None + + override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { + case c: DataStreamCalc => + super.visit(node, ordinal, parent) + // check if input has keys + if (keys.isDefined) { + // track keys forward + val inNames = c.getInput.getRowType.getFieldNames + val inOutNames = c.getProgram.getNamedProjects.asScala + .map(p => { + c.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case i: RexInputRef => (i.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => + a.getOperands.get(0) match { + case ref: RexInputRef => + (ref.getIndex, p.right) + case _ => + (-1, p.right) + } + // output field is not forwarded from input + case _: RexNode => (-1, p.right) + } + }) + // filter all non-forwarded fields + .filter(_._1 >= 0) + // resolve names of input fields + .map(io => (inNames.get(io._1), io._2)) + + // filter by input keys + val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2) + // check if all keys have been preserved + if (outKeys.nonEmpty && outKeys.length == keys.get.length) { + // all key have been preserved (but possibly renamed) + keys = Some(outKeys.toArray) + } else { + // some (or all) keys have been removed. Keys are no longer unique and removed + keys = None + } + } + case _: DataStreamOverAggregate => + super.visit(node, ordinal, parent) + // keys are always forwarded by Over aggregate + case a: DataStreamGroupAggregate => + // get grouping keys + val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length) + keys = Some(groupKeys.toArray) + case w: DataStreamGroupWindowAggregate => + // get grouping keys + val groupKeys = + w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray + // get window start and end time + val windowStartEnd = w.getWindowProperties.map(_.name) + // we have only a unique key if at least one window property is selected + if (windowStartEnd.nonEmpty) { + keys = Some(groupKeys ++ windowStartEnd) + } + case _: DataStreamRel => + // anything else does not forward keys or might duplicate key, so we can stop + keys = None + } + } + + } + } + http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index d296978..6448657 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -37,11 +37,6 @@ class TableConfig { private var nullCheck: Boolean = true /** - * Defines whether sink table requires that update and delete changes are sent with retraction - */ - private var needsUpdatesAsRetractionForSink: Boolean = false - - /** * Defines the configuration of Calcite for Table API and SQL queries. */ private var calciteConfig = CalciteConfig.DEFAULT @@ -72,18 +67,6 @@ class TableConfig { } /** - * Returns the need retraction property for table sink. - */ - def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink - - /** - * Set the need retraction property for table sink. - */ - def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = { - this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction - } - - /** * Returns the current configuration of Calcite for Table API and SQL queries. */ def getCalciteConfig: CalciteConfig = calciteConfig http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 5b752ab..bb0de3e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -51,15 +51,14 @@ import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ -import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction} +import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.RelTable -import org.apache.flink.table.runtime.types.CRowTypeInfo +import org.apache.flink.table.plan.schema.{RelTable, RowSchema} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog @@ -620,7 +619,7 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") - case a: AtomicType[A] => + case a: AtomicType[_] => exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => if (idx > 0) { @@ -691,53 +690,32 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } - /** - * Creates a final converter that maps the internal row type to external type. - * - * @param physicalTypeInfo the input of the sink - * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) - * @param requestedTypeInfo the output type of the sink - * @param functionName name of the map function. Must not be unique but has to be a - * valid Java class identifier. - */ - protected def getConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[OUT], - functionName: String): - Option[MapFunction[IN, OUT]] - protected def generateRowConverterFunction[OUT]( inputTypeInfo: TypeInformation[Row], - logicalRowType: RelDataType, + schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], functionName: String): GeneratedFunction[MapFunction[Row, OUT], OUT] = { // validate that at least the field types of physical and logical type match // we do that here to make sure that plan translation was correct - val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) - if (logicalRowTypeInfo != inputTypeInfo) { + if (schema.physicalTypeInfo != inputTypeInfo) { throw TableException("The field types of physical and logical row types do not match." + "This is a bug and should not happen. Please file an issue.") } - // convert to type information - val logicalFieldTypes = logicalRowType.getFieldList.asScala - .map(t => FlinkTypeFactory.toTypeInfo(t.getType)) - - // field names - val logicalFieldNames = logicalRowType.getFieldNames.asScala + val fieldTypes = schema.physicalFieldTypeInfo + val fieldNames = schema.physicalFieldNames // validate requested type - if (requestedTypeInfo.getArity != logicalFieldTypes.length) { + if (requestedTypeInfo.getArity != fieldTypes.length) { throw new TableException("Arity of result does not match requested type.") } requestedTypeInfo match { // POJO type requested case pt: PojoTypeInfo[_] => - logicalFieldNames.zip(logicalFieldTypes) foreach { + fieldNames.zip(fieldTypes) foreach { case (fName, fType) => val pojoIdx = pt.getFieldIndex(fName) if (pojoIdx < 0) { @@ -752,7 +730,7 @@ abstract class TableEnvironment(val config: TableConfig) { // Tuple/Case class/Row type requested case tt: TupleTypeInfoBase[_] => - logicalFieldTypes.zipWithIndex foreach { + fieldTypes.zipWithIndex foreach { case (fieldTypeInfo, i) => val requestedTypeInfo = tt.getTypeAt(i) if (fieldTypeInfo != requestedTypeInfo) { @@ -763,11 +741,11 @@ abstract class TableEnvironment(val config: TableConfig) { // Atomic type requested case at: AtomicType[_] => - if (logicalFieldTypes.size != 1) { + if (fieldTypes.size != 1) { throw new TableException(s"Requested result type is an atomic type but " + s"result has more or less than a single field.") } - val fieldTypeInfo = logicalFieldTypes.head + val fieldTypeInfo = fieldTypes.head if (fieldTypeInfo != at) { throw new TableException(s"Result field does not match requested type. " + s"Requested: $at; Actual: $fieldTypeInfo") @@ -787,7 +765,7 @@ abstract class TableEnvironment(val config: TableConfig) { val conversion = generator.generateConverterResultExpression( requestedTypeInfo, - logicalFieldNames) + fieldNames) val body = s""" http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index a649584..a70bcca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.api.java import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.table.api._ import org.apache.flink.table.functions.{AggregateFunction, TableFunction} import org.apache.flink.table.expressions.ExpressionParser import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import _root_.java.lang.{Boolean => JBool} /** * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]]. @@ -132,7 +134,10 @@ class StreamTableEnvironment( } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] @@ -145,11 +150,16 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - translate[T](table)(TypeExtractor.createTypeInfo(clazz)) + val typeInfo = TypeExtractor.createTypeInfo(clazz) + TableEnvironment.validateType(typeInfo) + translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] @@ -162,7 +172,68 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = { - translate[T](table)(typeInfo) + TableEnvironment.validateType(typeInfo) + translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param clazz The class of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T](table: Table, clazz: Class[T]): + DataStream[JTuple2[JBool, T]] = { + + val typeInfo = TypeExtractor.createTypeInfo(clazz) + TableEnvironment.validateType(typeInfo) + val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo) + translate[JTuple2[JBool, T]]( + table, + updatesAsRetraction = true, + withChangeFlag = true)(resultType) + } + + /** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message. + * + * The fields of the [[Table]] are mapped to the requested type as follows: + * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] + * types: Fields are mapped by position, field types must match. + * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. + * + * @param table The [[Table]] to convert. + * @param typeInfo The [[TypeInformation]] of the requested record type. + * @tparam T The type of the requested record type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]): + DataStream[JTuple2[JBool, T]] = { + + TableEnvironment.validateType(typeInfo) + val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]]( + Types.BOOLEAN, + typeInfo + ) + translate[JTuple2[JBool, T]]( + table, + updatesAsRetraction = true, + withChangeFlag = true)(resultTypeInfo) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 0552d7c..e5ad6c2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -17,10 +17,11 @@ */ package org.apache.flink.table.api.scala +import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig} -import org.apache.flink.table.functions.{AggregateFunction, TableFunction} +import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.functions.{AggregateFunction, TableFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.asScalaStream @@ -127,11 +128,14 @@ class StreamTableEnvironment( } /** - * Converts the given [[Table]] into a [[DataStream]] of a specified type. + * Converts the given [[Table]] into an append [[DataStream]] of a specified type. + * + * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified + * by update or delete changes, the conversion will fail. * * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. + * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field + * types must match. * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match. * * @param table The [[Table]] to convert. @@ -139,7 +143,24 @@ class StreamTableEnvironment( * @return The converted [[DataStream]]. */ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = { - asScalaStream(translate(table)) + val returnType = createTypeInformation[T] + asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType)) + } + +/** + * Converts the given [[Table]] into a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + * @param table The [[Table]] to convert. + * @tparam T The type of the requested data type. + * @return The converted [[DataStream]]. + */ + def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { + val returnType = createTypeInformation[(Boolean, T)] + asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 2a0d571..5efff62 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -57,5 +57,24 @@ class TableConversions(table: Table) { } } + /** Converts the [[Table]] to a [[DataStream]] of add and retract messages. + * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, + * the second field holds the record of the specified type [[T]]. + * + * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message. + * + */ + def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = { + + table.tableEnv match { + case tEnv: ScalaStreamTableEnv => + tEnv.toRetractStream(table) + case _ => + throw new TableException( + "Only tables that originate from Scala DataStreams " + + "can be converted to Scala DataStreams.") + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index dd8265b..310a75f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -755,7 +755,9 @@ class Table( * * A batch [[Table]] can only be written to a * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a - * [[org.apache.flink.table.sinks.StreamTableSink]]. + * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a + * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an + * [[org.apache.flink.table.sinks.UpsertStreamTableSink]]. * * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 59f723ac..ce0f966 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -100,11 +100,17 @@ class DataStreamCalc( calcProgram, config) + val inputParallelism = inputDataStream.getParallelism + val mapFunc = new CRowFlatMapRunner( genFunction.name, genFunction.code, CRowTypeInfo(schema.physicalTypeInfo)) - inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) + inputDataStream + .flatMap(mapFunc) + .name(calcOpName(calcProgram, getExpressionString)) + // keep parallelism to ensure order of accumulate and retract messages + .setParallelism(inputParallelism) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 056cda9..18f1fc8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -67,6 +67,8 @@ class DataStreamGroupAggregate( override def consumesRetractions = true + def getGroupings: Array[Int] = groupings + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupAggregate( cluster, http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index f61828b..1be1896 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -59,6 +59,10 @@ class DataStreamGroupWindowAggregate( override def consumesRetractions = true + def getGroupings: Array[Int] = grouping + + def getWindowProperties: Seq[NamedWindowProperty] = namedProperties + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamGroupWindowAggregate( window, http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala index c3b43ba..173b7d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala @@ -82,19 +82,38 @@ object AccModeTrait { } /** - * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might - * produce. - * In [[AccMode.Acc]] the node only emit accumulate messages. - * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes, - * retraction messages for delete changes, and accumulate and retraction messages - * for update changes. + * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded + * by the messeages that an operator emits. */ object AccMode extends Enumeration { type AccMode = Value - val Acc = Value // Operator produces only accumulate (insert) messages - val AccRetract = Value // Operator produces accumulate (insert, update) and - // retraction (delete, update) messages + /** + * An operator in [[Acc]] mode emits change messages as + * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row). + * + * An operator in [[Acc]] mode may only produce update and delete messages, if the table has + * a unique key and all key attributes are contained in the Row. + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update + * - delete: (false, OldRow) // the Row includes the full unique key to idenify the row to delete + * + */ + val Acc = Value + + /** + * * An operator in [[AccRetract]] mode emits change messages as + * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row). + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages! + * - delete: (false, OldRow) + * + */ + val AccRetract = Value } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala index 97c0dbb..f0b725d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala @@ -102,17 +102,17 @@ object DataStreamRetractionRules { val rel = call.rel(0).asInstanceOf[DataStreamRel] val traits = rel.getTraitSet - val traitsWithUpdateAsRetrac = + val traitsWithUpdateAsRetraction = if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) { traits.plus(UpdateAsRetractionTrait.DEFAULT) } else { traits } val traitsWithAccMode = - if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) { - traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT) + if (null == traitsWithUpdateAsRetraction.getTrait(AccModeTraitDef.INSTANCE)) { + traitsWithUpdateAsRetraction.plus(AccModeTrait.DEFAULT) } else { - traitsWithUpdateAsRetrac + traitsWithUpdateAsRetraction } if (traits != traitsWithAccMode) { @@ -122,8 +122,8 @@ object DataStreamRetractionRules { } /** - * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete - * changes as retraction messages. + * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update changes with + * retraction messages. */ class SetUpdatesAsRetractionRule extends RelOptRule( operand( @@ -131,7 +131,7 @@ object DataStreamRetractionRules { "SetUpdatesAsRetractionRule") { /** - * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction + * Checks if a [[RelNode]] requires that update changes are sent with retraction * messages. */ def needsUpdatesAsRetraction(node: RelNode): Boolean = { @@ -142,7 +142,7 @@ object DataStreamRetractionRules { } /** - * Annotates a [[RelNode]] to send out update and delete changes as retraction messages. + * Annotates a [[RelNode]] to send out update changes with retraction messages. */ def setUpdatesAsRetraction(relNode: RelNode): RelNode = { val traitSet = relNode.getTraitSet http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala index 66e51b1..ff3821a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala @@ -37,7 +37,7 @@ class CRowCorrelateFlatMapRunner( flatMapCode: String, collectorName: String, collectorCode: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichFlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[Any] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala index 9a4650b..9701cb9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory class CRowFlatMapRunner( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichFlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[FlatMapFunction[Row, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala index 8e95c93..109c6e1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory class CRowInputMapRunner[OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichMapFunction[CRow, OUT] with ResultTypeQueryable[OUT] with Compiler[MapFunction[Row, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala index 54bbf7e..7c96437 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.runtime +import java.lang.{Boolean => JBool} + import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable @@ -28,36 +30,63 @@ import org.apache.flink.types.Row import org.slf4j.LoggerFactory import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} - /** - * Convert [[CRow]] to a [[Tuple2]] + * Convert [[CRow]] to a [[JTuple2]] */ -class CRowInputTupleOutputMapRunner[OUT]( +class CRowInputJavaTupleOutputMapRunner( name: String, code: String, - @transient returnType: TypeInformation[JTuple2[Boolean, OUT]]) - extends RichMapFunction[CRow, JTuple2[Boolean, OUT]] - with ResultTypeQueryable[JTuple2[Boolean, OUT]] - with Compiler[MapFunction[Row, OUT]] { + @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) + extends RichMapFunction[CRow, Any] + with ResultTypeQueryable[JTuple2[JBool, Any]] + with Compiler[MapFunction[Row, Any]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[Row, OUT] = _ - private var tupleWrapper: JTuple2[Boolean, OUT] = _ + private var function: MapFunction[Row, Any] = _ + private var tupleWrapper: JTuple2[JBool, Any] = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating MapFunction.") function = clazz.newInstance() - tupleWrapper = new JTuple2[Boolean, OUT]() + tupleWrapper = new JTuple2[JBool, Any]() } - override def map(in: CRow): JTuple2[Boolean, OUT] = { + override def map(in: CRow): JTuple2[JBool, Any] = { tupleWrapper.f0 = in.change tupleWrapper.f1 = function.map(in.row) tupleWrapper } - override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType + override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType +} + +/** + * Convert [[CRow]] to a [[Tuple2]] + */ +class CRowInputScalaTupleOutputMapRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[(Boolean, Any)]) + extends RichMapFunction[CRow, (Boolean, Any)] + with ResultTypeQueryable[(Boolean, Any)] + with Compiler[MapFunction[Row, Any]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, Any] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): (Boolean, Any) = + (in.change, function.map(in.row)) + + override def getProducedType: TypeInformation[(Boolean, Any)] = returnType } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala index 966dea9..cb8f695 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory class CRowOutputMapRunner( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends RichMapFunction[Any, CRow] with ResultTypeQueryable[CRow] with Compiler[MapFunction[Any, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala index a0415e1..478b6b6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala @@ -32,7 +32,7 @@ class CorrelateFlatMapRunner[IN, OUT]( flatMapCode: String, collectorName: String, collectorCode: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichFlatMapFunction[IN, OUT] with ResultTypeQueryable[OUT] with Compiler[Any] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala index 715848d..67acc0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory class FlatJoinRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichFlatJoinFunction[IN1, IN2, OUT] with ResultTypeQueryable[OUT] with Compiler[FlatJoinFunction[IN1, IN2, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala index 2e37baf..938da59 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory class FlatMapRunner( name: String, code: String, - @transient returnType: TypeInformation[Row]) + @transient var returnType: TypeInformation[Row]) extends RichFlatMapFunction[Row, Row] with ResultTypeQueryable[Row] with Compiler[FlatMapFunction[Row, Row]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala index 644e855..5f3dbb4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala @@ -24,7 +24,7 @@ import org.apache.flink.util.Collector class MapJoinLeftRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + returnType: TypeInformation[OUT], broadcastSetName: String) extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala index eee38d1..e2d9331 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala @@ -24,7 +24,7 @@ import org.apache.flink.util.Collector class MapJoinRightRunner[IN1, IN2, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + returnType: TypeInformation[OUT], broadcastSetName: String) extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala index 32562c7..14eeecf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory class MapRunner[IN, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT]) + @transient var returnType: TypeInformation[OUT]) extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] with Compiler[MapFunction[IN, OUT]] { http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala index 090e184..00b7b8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT]( name: String, code: String, - @transient returnType: TypeInformation[OUT], + @transient var returnType: TypeInformation[OUT], broadcastSetName: String) extends RichFlatMapFunction[MULTI_IN, OUT] with ResultTypeQueryable[OUT] http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala index 22a2682..9bcac30 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala @@ -41,7 +41,7 @@ class DataSetSessionWindowAggregatePreProcessor( genAggregations: GeneratedAggregationsFunction, keysAndAggregatesArity: Int, gap: Long, - @transient intermediateRowType: TypeInformation[Row]) + @transient var intermediateRowType: TypeInformation[Row]) extends AbstractRichFunction with MapPartitionFunction[Row,Row] with GroupCombineFunction[Row,Row] http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 745f24d..6ee37e6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -17,6 +17,8 @@ */ package org.apache.flink.table.runtime.aggregate +import java.lang.{Long => JLong} + import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row @@ -24,6 +26,7 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.api.Types import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.slf4j.LoggerFactory import org.apache.flink.table.runtime.types.CRow @@ -47,7 +50,10 @@ class GroupAggProcessFunction( private var newRow: CRow = _ private var prevRow: CRow = _ private var firstRow: Boolean = _ + // stores the accumulators private var state: ValueState[Row] = _ + // counts the number of added and retracted input records + private var cntState: ValueState[JLong] = _ override def open(config: Configuration) { LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + @@ -65,6 +71,9 @@ class GroupAggProcessFunction( val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType) state = getRuntimeContext.getState(stateDescriptor) + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG) + cntState = getRuntimeContext.getState(inputCntDescriptor) } override def processElement( @@ -74,11 +83,14 @@ class GroupAggProcessFunction( val input = inputC.row - // get accumulators + // get accumulators and input counter var accumulators = state.value() + var inputCnt = cntState.value() + if (null == accumulators) { firstRow = true accumulators = function.createAccumulators() + inputCnt = 0L } else { firstRow = false } @@ -92,29 +104,44 @@ class GroupAggProcessFunction( // update aggregate result and set to the newRow if (inputC.change) { + inputCnt += 1 // accumulate input function.accumulate(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } else { + inputCnt -= 1 // retract input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } - // update accumulators - state.update(accumulators) - - // if previousRow is not null, do retraction process - if (generateRetraction && !firstRow) { - if (prevRow.row.equals(newRow.row)) { - // ignore same newRow - return - } else { - // retract previous row - out.collect(prevRow) + if (inputCnt != 0) { + // we aggregated at least one record for this key + + // update the state + state.update(accumulators) + cntState.update(inputCnt) + + // if this was not the first row and we have to emit retractions + if (generateRetraction && !firstRow) { + if (prevRow.row.equals(newRow.row)) { + // newRow is the same as before. Do not emit retraction and acc messages + return + } else { + // retract previous result + out.collect(prevRow) + } } - } + // emit the new result + out.collect(newRow) - out.collect(newRow) + } else { + // we retracted the last record for this key + // sent out a delete message + out.collect(prevRow) + // and clear all state + state.clear() + cntState.clear() + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala index ec73fa6..1cb3a6e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory class CRowValuesInputFormat( name: String, code: String, - @transient returnType: TypeInformation[CRow]) + @transient var returnType: TypeInformation[CRow]) extends GenericInputFormat[CRow] with NonParallelInput with ResultTypeQueryable[CRow] http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala index d536b39..43ce605 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory class ValuesInputFormat( name: String, code: String, - @transient returnType: TypeInformation[Row]) + @transient var returnType: TypeInformation[Row]) extends GenericInputFormat[Row] with NonParallelInput with ResultTypeQueryable[Row] http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala new file mode 100644 index 0000000..abdca17 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.Table + +/** + * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes. + * + * If the [[Table]] is also modified by update or delete changes, a + * [[org.apache.flink.table.api.TableException]] will be thrown. + * + * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. + */ +trait AppendStreamTableSink[T] extends TableSink[T] { + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[T]): Unit +}