[FLINK-6093] [table] Implement and turn on retraction for table sinks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfed279f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfed279f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfed279f Branch: refs/heads/master Commit: bfed279f05a0e131c11f963e2380cd4c582e6bc3 Parents: 27bf4ca Author: Hequn Cheng <chenghe...@gmail.com> Authored: Thu Apr 27 23:03:44 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat May 6 01:51:55 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 16 +-- .../table/api/StreamTableEnvironment.scala | 122 +++++++++++++++++-- .../apache/flink/table/api/TableConfig.scala | 17 +++ .../flink/table/api/TableEnvironment.scala | 12 -- .../datastream/DataStreamRetractionRules.scala | 5 +- .../runtime/CRowInputTupleOutputMapRunner.scala | 63 ++++++++++ .../apache/flink/table/sinks/CsvTableSink.scala | 98 ++++++++++++++- .../flink/table/sinks/StreamRetractSink.scala | 35 ++++++ .../flink/table/TableEnvironmentTest.scala | 62 +--------- .../scala/batch/TableEnvironmentITCase.scala | 19 --- .../api/scala/stream/RetractionITCase.scala | 9 +- .../api/scala/stream/TableSinkITCase.scala | 33 ++++- .../api/scala/stream/utils/StreamITCase.scala | 8 -- 13 files changed, 363 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index f7955f0..c7bacfe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -26,12 +26,11 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.RuleSet -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.configuration.Configuration import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions @@ -39,7 +38,6 @@ import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable} import org.apache.flink.table.runtime.MapRunner -import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} import org.apache.flink.types.Row @@ -150,18 +148,6 @@ abstract class BatchTableEnvironment( if (requestedTypeInfo.getTypeClass == classOf[Row]) { // Row to Row, no conversion needed None - } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) { - // Row to CRow, only needs to be wrapped - Some( - new RichMapFunction[Row, CRow] { - private var outCRow: CRow = _ - override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true) - override def map(value: Row): CRow = { - outCRow.row = value - outCRow - } - }.asInstanceOf[MapFunction[IN, OUT]] - ) } else { // some type that is neither Row or CRow http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 0632a47..bd06305 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -26,22 +26,25 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeinfo.AtomicType import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference} import org.apache.flink.table.plan.nodes.FlinkConventions -import org.apache.flink.table.plan.nodes.datastream.DataStreamRel +import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable} -import org.apache.flink.table.runtime.CRowInputMapRunner +import org.apache.flink.table.plan.schema.StreamTableSourceTable +import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.table.sinks.{StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TypeCheckUtils import org.apache.flink.types.Row @@ -130,6 +133,14 @@ abstract class StreamTableEnvironment( val result: DataStream[T] = translate(table)(outputType) // Give the DataSet to the TableSink to emit it. streamSink.emitDataStream(result) + + case streamRetractSink: StreamRetractSink[T] => + val outputType = sink.getOutputType + this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction) + // translate the Table into a DataStream and provide the type that the TableSink expects. + val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType) + // Give the DataSet to the TableSink to emit it. + streamRetractSink.emitDataStreamWithChange(result) case _ => throw new TableException("StreamTableSink required to emit streaming Table") } @@ -153,7 +164,7 @@ abstract class StreamTableEnvironment( Option[MapFunction[IN, OUT]] = { if (requestedTypeInfo.getTypeClass == classOf[CRow]) { - // CRow to CRow, no conversion needed + // only used to explain table None } else if (requestedTypeInfo.getTypeClass == classOf[Row]) { // CRow to Row, only needs to be unwrapped @@ -164,7 +175,6 @@ abstract class StreamTableEnvironment( ) } else { // Some type that is neither CRow nor Row - val converterFunction = generateRowConverterFunction[OUT]( physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, logicalRowType, @@ -177,11 +187,40 @@ abstract class StreamTableEnvironment( converterFunction.code, converterFunction.returnType) .asInstanceOf[MapFunction[IN, OUT]]) - } } /** + * Creates a final converter that maps the internal CRow type to external Tuple2 type. + * + * @param physicalTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + protected def getTupleConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = { + + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + logicalRowType, + requestedTypeInfo, + functionName + ) + + Some(new CRowInputTupleOutputMapRunner[OUT]( + converterFunction.name, + converterFunction.code, + new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo)) + .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]]) + } + + /** * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s * catalog. * @@ -371,7 +410,7 @@ abstract class StreamTableEnvironment( // 5. optimize the physical Flink plan val physicalOptRuleSet = getPhysicalOptRuleSet val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() - val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { + var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) } else { logicalPlan @@ -380,6 +419,12 @@ abstract class StreamTableEnvironment( // 6. decorate the optimized plan val decoRuleSet = getDecoRuleSet val decoratedPlan = if (decoRuleSet.iterator().hasNext) { + + if (this.config.getNeedsUpdatesAsRetractionForSink) { + physicalPlan = physicalPlan.copy( + physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)), + physicalPlan.getInputs) + } runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet) } else { physicalPlan @@ -388,7 +433,6 @@ abstract class StreamTableEnvironment( decoratedPlan } - /** * Translates a [[Table]] into a [[DataStream]]. * @@ -444,6 +488,62 @@ abstract class StreamTableEnvironment( } /** + * Translates a [[Table]] into a [[DataStream]] with change information. + * + * The transformation involves optimizing the relational expression tree as defined by + * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. + * + * @param table The root node of the relational expression tree. + * @param wrapToTuple True, if want to output chang information + * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. + * @tparam A The type of the resulting [[DataStream]]. + * @return The [[DataStream]] that corresponds to the translated [[Table]]. + */ + protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]) + : DataStream[JTuple2[Boolean, A]] = { + val relNode = table.getRelNode + val dataStreamPlan = optimize(relNode) + translate(dataStreamPlan, relNode.getRowType, wrapToTuple) + } + + /** + * Translates a logical [[RelNode]] into a [[DataStream]] with change information. + * + * @param logicalPlan The root node of the relational expression tree. + * @param logicalType The row type of the result. Since the logicalPlan can lose the + * @param wrapToTuple True, if want to output chang information + * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. + * @tparam A The type of the resulting [[DataStream]]. + * @return The [[DataStream]] that corresponds to the translated [[Table]]. + */ + protected def translate[A]( + logicalPlan: RelNode, + logicalType: RelDataType, + wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = { + + TableEnvironment.validateType(tpe) + + logicalPlan match { + case node: DataStreamRel => + val plan = node.translateToPlan(this) + val conversion = + getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + conversion match { + case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary + case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) => + plan.map(mapFunction) + .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe)) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .asInstanceOf[DataStream[JTuple2[Boolean, A]]] + } + + case _ => + throw TableException("Cannot generate DataStream due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") + } + } + + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. * http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index 6448657..d296978 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -37,6 +37,11 @@ class TableConfig { private var nullCheck: Boolean = true /** + * Defines whether sink table requires that update and delete changes are sent with retraction + */ + private var needsUpdatesAsRetractionForSink: Boolean = false + + /** * Defines the configuration of Calcite for Table API and SQL queries. */ private var calciteConfig = CalciteConfig.DEFAULT @@ -67,6 +72,18 @@ class TableConfig { } /** + * Returns the need retraction property for table sink. + */ + def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink + + /** + * Set the need retraction property for table sink. + */ + def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = { + this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction + } + + /** * Returns the current configuration of Calcite for Table API and SQL queries. */ def getCalciteConfig: CalciteConfig = calciteConfig http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index d27db1e..5b752ab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -644,18 +644,6 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } - case cr: CRowTypeInfo => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Alias(UnresolvedFieldReference(origName), name, _), _) => - val idx = cr.getFieldIndex(origName) - if (idx < 0) { - throw new TableException(s"$origName is not a field of type $cr") - } - (idx, name) - case _ => throw new TableException( - "Field reference expression or alias on field expression expected.") - } case c: CaseClassTypeInfo[A] => exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala index bd9a7ee..97c0dbb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala @@ -102,13 +102,14 @@ object DataStreamRetractionRules { val rel = call.rel(0).asInstanceOf[DataStreamRel] val traits = rel.getTraitSet - val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) { + val traitsWithUpdateAsRetrac = + if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) { traits.plus(UpdateAsRetractionTrait.DEFAULT) } else { traits } val traitsWithAccMode = - if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) { + if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) { traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT) } else { traitsWithUpdateAsRetrac http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala new file mode 100644 index 0000000..54bbf7e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} + + +/** + * Convert [[CRow]] to a [[Tuple2]] + */ +class CRowInputTupleOutputMapRunner[OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[JTuple2[Boolean, OUT]]) + extends RichMapFunction[CRow, JTuple2[Boolean, OUT]] + with ResultTypeQueryable[JTuple2[Boolean, OUT]] + with Compiler[MapFunction[Row, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, OUT] = _ + private var tupleWrapper: JTuple2[Boolean, OUT] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + tupleWrapper = new JTuple2[Boolean, OUT]() + } + + override def map(in: CRow): JTuple2[Boolean, OUT] = { + tupleWrapper.f0 = in.change + tupleWrapper.f1 = function.map(in.row) + tupleWrapper + } + + override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala index 4a2fcdf..809afd2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala @@ -25,7 +25,7 @@ import org.apache.flink.types.Row import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} /** * A simple [[TableSink]] to emit data as CSV files. @@ -135,3 +135,99 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { } } +/** + * A simple [[TableSink]] to emit data as CSV files. + * + * @param path The output path to write the Table to. + * @param fieldDelim The field delimiter + * @param numFiles The number of files to write to + * @param writeMode The write mode to specify whether existing files are overwritten or not. + */ +class CsvRetractTableSink( + path: String, + fieldDelim: Option[String], + numFiles: Option[Int], + writeMode: Option[WriteMode]) + extends TableSinkBase[Row] with StreamRetractSink[Row] { + + override def needsUpdatesAsRetraction: Boolean = true + + /** + * A simple [[TableSink]] to emit data as CSV files. + * + * @param path The output path to write the Table to. + * @param fieldDelim The field delimiter, ',' by default. + */ + def this(path: String, fieldDelim: String = ",") { + this(path, Some(fieldDelim), None, None) + } + + /** + * A simple [[TableSink]] to emit data as CSV files. + * + * @param path The output path to write the Table to. + * @param fieldDelim The field delimiter. + * @param numFiles The number of files to write to. + * @param writeMode The write mode to specify whether existing files are overwritten or not. + */ + def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) { + this(path, Some(fieldDelim), Some(numFiles), Some(writeMode)) + } + + + override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = { + val csvRows = dataStream + .map(new CsvRetractFormatter(fieldDelim.getOrElse(","))) + .returns(TypeInformation.of(classOf[String])) + + + if (numFiles.isDefined) { + csvRows.setParallelism(numFiles.get) + } + + val sink = writeMode match { + case None => csvRows.writeAsText(path) + case Some(wm) => csvRows.writeAsText(path, wm) + } + + if (numFiles.isDefined) { + sink.setParallelism(numFiles.get) + } + } + + override protected def copy: TableSinkBase[Row] = { + new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode) + } + + override def getOutputType: TypeInformation[Row] = { + new RowTypeInfo(getFieldTypes: _*) + } +} + +/** + * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the + * field delimiter. + * + * @param fieldDelim The field delimiter. + */ +class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] { + override def map(rowT: JTuple2[Boolean,Row]): String = { + + val row: Row = rowT.f1 + + val builder = new StringBuilder + + builder.append(rowT.f0.toString) + + // write following values + for (i <- 0 until row.getArity) { + builder.append(fieldDelim) + val v = row.getField(i) + if (v != null) { + builder.append(v.toString) + } + } + builder.mkString + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala new file mode 100644 index 0000000..7f7c944 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.streaming.api.datastream.DataStream + +trait StreamRetractSink[T] extends TableSink[T]{ + + /** + * Whether the [[StreamTableSink]] requires that update and delete changes are sent with + * retraction messages. + */ + def needsUpdatesAsRetraction: Boolean = false + + /** Emits the DataStream with change infomation. */ + def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 60de1f1..675e5d9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.scala._ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.TableException @@ -62,14 +63,6 @@ class TableEnvironmentTest extends TableTestBase { } @Test - def testGetFieldInfoCRow(): Unit = { - val fieldInfo = tEnv.getFieldInfo(cRowType) - - fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1)) - fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) - } - - @Test def testGetFieldInfoCClass(): Unit = { val fieldInfo = tEnv.getFieldInfo(caseClassType) @@ -113,20 +106,6 @@ class TableEnvironmentTest extends TableTestBase { } @Test - def testGetFieldInfoCRowNames(): Unit = { - val fieldInfo = tEnv.getFieldInfo( - cRowType, - Array( - UnresolvedFieldReference("name1"), - UnresolvedFieldReference("name2"), - UnresolvedFieldReference("name3") - )) - - fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) - fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) - } - - @Test def testGetFieldInfoCClassNames(): Unit = { val fieldInfo = tEnv.getFieldInfo( caseClassType, @@ -225,45 +204,6 @@ class TableEnvironmentTest extends TableTestBase { } @Test - def testGetFieldInfoCRowAlias1(): Unit = { - val fieldInfo = tEnv.getFieldInfo( - cRowType, - Array( - Alias(UnresolvedFieldReference("f0"), "name1"), - Alias(UnresolvedFieldReference("f1"), "name2"), - Alias(UnresolvedFieldReference("f2"), "name3") - )) - - fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) - fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) - } - - @Test - def testGetFieldInfoCRowAlias2(): Unit = { - val fieldInfo = tEnv.getFieldInfo( - cRowType, - Array( - Alias(UnresolvedFieldReference("f2"), "name1"), - Alias(UnresolvedFieldReference("f0"), "name2"), - Alias(UnresolvedFieldReference("f1"), "name3") - )) - - fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) - fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) - } - - @Test(expected = classOf[TableException]) - def testGetFieldInfoCRowAlias3(): Unit = { - tEnv.getFieldInfo( - cRowType, - Array( - Alias(UnresolvedFieldReference("xxx"), "name1"), - Alias(UnresolvedFieldReference("yyy"), "name2"), - Alias(UnresolvedFieldReference("zzz"), "name3") - )) - } - - @Test def testGetFieldInfoCClassAlias1(): Unit = { val fieldInfo = tEnv.getFieldInfo( caseClassType, http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index ebfac0a..0c2505a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -165,25 +165,6 @@ class TableEnvironmentITCase( } @Test - def testToDataSetWithTypeOfCRow(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env) - .toTable(tEnv, 'a, 'b, 'c) - .select('a, 'b, 'c) - - val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" + - "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" + - "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" + - "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" + - "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" + - "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n" - val results = t.toDataSet[CRow].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test def testToTableFromCaseClass(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala index dde7f89..d490763 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala @@ -67,13 +67,12 @@ class RetractionITCase extends StreamingWithStateTestBase { .groupBy('count) .select('count, 'word.count as 'frequency) - // to DataStream with CRow - val results = resultTable.toDataStream[CRow] - results.addSink(new StreamITCase.StringSinkWithCRow) + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) env.execute() - val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1", - "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2") + val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", + "4,1", "4,0", "5,1", "5,0", "6,1", "1,2") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala index c446d64..ceae6c6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala @@ -23,7 +23,7 @@ import java.io.File import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.stream.utils.StreamTestData import org.apache.flink.table.api.scala._ -import org.apache.flink.table.sinks.CsvTableSink +import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment @@ -59,5 +59,34 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { TestBaseUtils.compareResultsByLinesInMemory(expected, path) } - + + @Test + def testStreamTableSinkNeedRetraction(): Unit = { + + val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") + tmpFile.deleteOnExit() + val path = tmpFile.toURI.toString + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(4) + + val input = StreamTestData.get3TupleDataStream(env) + .map(x => x).setParallelism(1) // increase DOP to 4 + + val results = input.toTable(tEnv, 'a, 'b, 'c) + .where('a < 5 || 'a > 17) + .select('c, 'b) + .groupBy('b) + .select('b, 'c.count) + .writeToSink(new CsvRetractTableSink(path)) + + env.execute() + + val expected = Seq( + "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1", + "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n") + + TestBaseUtils.compareResultsByLinesInMemory(expected, path) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala index 6c75d53..497869d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala @@ -49,12 +49,4 @@ object StreamITCase { } } } - - final class StringSinkWithCRow extends RichSinkFunction[CRow]() { - def invoke(value: CRow) { - testResults.synchronized { - testResults += value.toString - } - } - } }