http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala index 809afd2..7214394 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala @@ -25,7 +25,6 @@ import org.apache.flink.types.Row import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} /** * A simple [[TableSink]] to emit data as CSV files. @@ -40,7 +39,7 @@ class CsvTableSink( fieldDelim: Option[String], numFiles: Option[Int], writeMode: Option[WriteMode]) - extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] { + extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] { /** * A simple [[TableSink]] to emit data as CSV files. @@ -134,100 +133,3 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { builder.mkString } } - -/** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter - * @param numFiles The number of files to write to - * @param writeMode The write mode to specify whether existing files are overwritten or not. - */ -class CsvRetractTableSink( - path: String, - fieldDelim: Option[String], - numFiles: Option[Int], - writeMode: Option[WriteMode]) - extends TableSinkBase[Row] with StreamRetractSink[Row] { - - override def needsUpdatesAsRetraction: Boolean = true - - /** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter, ',' by default. - */ - def this(path: String, fieldDelim: String = ",") { - this(path, Some(fieldDelim), None, None) - } - - /** - * A simple [[TableSink]] to emit data as CSV files. - * - * @param path The output path to write the Table to. - * @param fieldDelim The field delimiter. - * @param numFiles The number of files to write to. - * @param writeMode The write mode to specify whether existing files are overwritten or not. - */ - def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) { - this(path, Some(fieldDelim), Some(numFiles), Some(writeMode)) - } - - - override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = { - val csvRows = dataStream - .map(new CsvRetractFormatter(fieldDelim.getOrElse(","))) - .returns(TypeInformation.of(classOf[String])) - - - if (numFiles.isDefined) { - csvRows.setParallelism(numFiles.get) - } - - val sink = writeMode match { - case None => csvRows.writeAsText(path) - case Some(wm) => csvRows.writeAsText(path, wm) - } - - if (numFiles.isDefined) { - sink.setParallelism(numFiles.get) - } - } - - override protected def copy: TableSinkBase[Row] = { - new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode) - } - - override def getOutputType: TypeInformation[Row] = { - new RowTypeInfo(getFieldTypes: _*) - } -} - -/** - * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the - * field delimiter. - * - * @param fieldDelim The field delimiter. - */ -class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] { - override def map(rowT: JTuple2[Boolean,Row]): String = { - - val row: Row = rowT.f1 - - val builder = new StringBuilder - - builder.append(rowT.f0.toString) - - // write following values - for (i <- 0 until row.getArity) { - builder.append(fieldDelim) - val v = row.getField(i) - if (v != null) { - builder.append(v.toString) - } - } - builder.mkString - } -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala new file mode 100644 index 0000000..3ab997e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.Types + +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.Table + +/** + * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete + * changes. + * + * The table will be converted into a stream of accumulate and retraction messages which are + * encoded as [[JTuple2]]. + * The first field is a [[JBool]] flag to indicate the message type. + * The second field holds the record of the requested type [[T]]. + * + * A message with true [[JBool]] flag is an accumulate (or add) message. + * A message with false flag is a retract message. + * + * @tparam T Type of records that this [[TableSink]] expects and supports. + */ +trait RetractStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] { + + /** Returns the requested record type */ + def getRecordType: TypeInformation[T] + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit + + override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala deleted file mode 100644 index 7f7c944..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks - -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} -import org.apache.flink.streaming.api.datastream.DataStream - -trait StreamRetractSink[T] extends TableSink[T]{ - - /** - * Whether the [[StreamTableSink]] requires that update and delete changes are sent with - * retraction messages. - */ - def needsUpdatesAsRetraction: Boolean = false - - /** Emits the DataStream with change infomation. */ - def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala deleted file mode 100644 index 360252e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks - -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.Table - -/** Defines an external [[TableSink]] to emit a batch [[Table]]. - * - * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. - */ -trait StreamTableSink[T] extends TableSink[T] { - - /** Emits the DataStream. */ - def emitDataStream(dataStream: DataStream[T]): Unit -} http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala new file mode 100644 index 0000000..2ae3406 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{Table, Types} + +/** + * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete + * changes. The [[Table]] must be have unique key fields (atomic or composite) or be append-only. + * + * If the [[Table]] does not have a unique key and is not append-only, a + * [[org.apache.flink.table.api.TableException]] will be thrown. + * + * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]] + * method. + * + * The [[Table]] will be converted into a stream of upsert and delete messages which are encoded as + * [[JTuple2]]. The first field is a [[JBool]] flag to indicate the message type. The second field + * holds the record of the requested type [[T]]. + * + * A message with true [[JBool]] field is an upsert message for the configured key. + * A message with false flag is a delete message for the configured key. + * + * If the table is append-only, all messages will have a true flag and must be interpreted + * as insertions. + * + * @tparam T Type of records that this [[TableSink]] expects and supports. + */ +trait UpsertStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] { + + /** + * Configures the unique key fields of the [[Table]] to write. + * The method is called after [[TableSink.configure()]]. + * + * The keys array might be empty, if the table consists of a single (updated) record. + * If the table does not have a key and is append-only, the keys attribute is null. + * + * @param keys the field names of the table's keys, an empty array if the table has a single + * row, and null if the table is append-only and has no key. + */ + def setKeyFields(keys: Array[String]): Unit + + /** + * Specifies whether the [[Table]] to write is append-only or not. + * + * @param isAppendOnly true if the table is append-only, false otherwise. + */ + def setIsAppendOnly(isAppendOnly: Boolean): Unit + + /** Returns the requested record type */ + def getRecordType: TypeInformation[T] + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit + + override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType) +} http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 675e5d9..ba3b591 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.table import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.scala._ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.TableException http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala index d490763..40f4c7d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala @@ -27,10 +27,8 @@ import org.junit.Test import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.utils.TableFunc0 -import scala.collection.mutable /** * tests for retraction @@ -55,51 +53,47 @@ class RetractionITCase extends StreamingWithStateTestBase { def testWordCount(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - env.setParallelism(1) + StreamITCase.clear env.setStateBackend(getStateBackend) val stream = env.fromCollection(data) val table = stream.toTable(tEnv, 'word, 'num) val resultTable = table .groupBy('word) - .select('word as 'word, 'num.sum as 'count) + .select('num.sum as 'count) .groupBy('count) - .select('count, 'word.count as 'frequency) + .select('count, 'count.count as 'frequency) - val results = resultTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = resultTable.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) env.execute() - val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", - "4,1", "4,0", "5,1", "5,0", "6,1", "1,2") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = Seq("1,2", "2,1", "6,1") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } // keyed groupby + non-keyed groupby @Test def testGroupByAndNonKeyedGroupBy(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - env.setParallelism(1) + StreamITCase.clear env.setStateBackend(getStateBackend) val stream = env.fromCollection(data) val table = stream.toTable(tEnv, 'word, 'num) val resultTable = table .groupBy('word) - .select('word as 'word, 'num.sum as 'count) - .select('count.sum) + .select('word as 'word, 'num.sum as 'cnt) + .select('cnt.sum) + + val results = resultTable.toRetractStream[Row] - val results = resultTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() - val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9", - "10") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = Seq("10") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } // non-keyed groupby + keyed groupby @@ -108,8 +102,7 @@ class RetractionITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - env.setParallelism(1) + StreamITCase.clear env.setStateBackend(getStateBackend) val stream = env.fromCollection(data) @@ -119,13 +112,12 @@ class RetractionITCase extends StreamingWithStateTestBase { .groupBy('count) .select('count, 'count.count) - val results = resultTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = resultTable.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) env.execute() - val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," + - "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = Seq("10,1") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } // test unique process, if the current output message of unbounded groupby equals the @@ -150,9 +142,9 @@ class RetractionITCase extends StreamingWithStateTestBase { ) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - env.setParallelism(1) + StreamITCase.clear env.setStateBackend(getStateBackend) + env.setParallelism(1) val stream = env.fromCollection(data) val table = stream.toTable(tEnv, 'pk, 'value) @@ -162,12 +154,13 @@ class RetractionITCase extends StreamingWithStateTestBase { .groupBy('sum) .select('sum, 'pk.count as 'count) - val results = resultTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = resultTable.toRetractStream[Row] + results.addSink(new StreamITCase.RetractMessagesSink) env.execute() - val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," + - "0", "18,1", "8,1") + val expected = Seq( + "+1,1", "+2,1", "+3,1", "-3,1", "+6,1", "-1,1", "+1,2", "-1,2", "+1,3", "-6,1", "+6,2", + "-6,2", "+6,1", "+12,1", "-12,1", "+18,1", "+8,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -176,8 +169,7 @@ class RetractionITCase extends StreamingWithStateTestBase { def testCorrelate(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - env.setParallelism(1) + StreamITCase.clear env.setStateBackend(getStateBackend) val func0 = new TableFunc0 @@ -186,19 +178,17 @@ class RetractionITCase extends StreamingWithStateTestBase { val table = stream.toTable(tEnv, 'word, 'num) val resultTable = table .groupBy('word) - .select('word as 'word, 'num.sum as 'count) + .select('word as 'word, 'num.sum as 'cnt) .leftOuterJoin(func0('word)) - .groupBy('count) - .select('count, 'word.count as 'frequency) + .groupBy('cnt) + .select('cnt, 'word.count as 'frequency) - val results = resultTable.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = resultTable.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) env.execute() - val expected = Seq( - "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1", - "5,0", "6,1", "1,2") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = Seq("1,2", "2,1", "6,1") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala index ceae6c6..c446d64 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala @@ -23,7 +23,7 @@ import java.io.File import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.stream.utils.StreamTestData import org.apache.flink.table.api.scala._ -import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink} +import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment @@ -59,34 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { TestBaseUtils.compareResultsByLinesInMemory(expected, path) } - - @Test - def testStreamTableSinkNeedRetraction(): Unit = { - - val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") - tmpFile.deleteOnExit() - val path = tmpFile.toURI.toString - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - env.setParallelism(4) - - val input = StreamTestData.get3TupleDataStream(env) - .map(x => x).setParallelism(1) // increase DOP to 4 - - val results = input.toTable(tEnv, 'a, 'b, 'c) - .where('a < 5 || 'a > 17) - .select('c, 'b) - .groupBy('b) - .select('b, 'c.count) - .writeToSink(new CsvRetractTableSink(path)) - - env.execute() - - val expected = Seq( - "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1", - "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n") - - TestBaseUtils.compareResultsByLinesInMemory(expected, path) - } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index abbcbdd..249d505 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -53,19 +53,19 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b" - val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) - val result = tEnv.sql(sqlQuery).toDataStream[Row] - result.addSink(new StreamITCase.StringSink) + val result = tEnv.sql(sqlQuery).toRetractStream[Row] + result.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() - val expected = mutable.MutableList("1,1", "2,1", "2,2") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } /** test selection **/ @@ -74,7 +74,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable" @@ -85,7 +85,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList("2,0", "4,1", "6,1") + val expected = List("2,0", "4,1", "6,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -95,7 +95,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT * FROM MyTable WHERE a = 3" @@ -106,7 +106,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList("3,2,Hello world") + val expected = List("3,2,Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -116,7 +116,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" @@ -127,7 +127,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList("3,2,Hello world") + val expected = List("3,2,Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -136,7 +136,7 @@ class SqlITCase extends StreamingWithStateTestBase { def testUnion(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT * FROM T1 " + "UNION ALL " + @@ -151,7 +151,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,1,Hi", "1,1,Hi", "2,2,Hello", "2,2,Hello", "3,2,Hello world", "3,2,Hello world") @@ -163,7 +163,7 @@ class SqlITCase extends StreamingWithStateTestBase { def testUnionWithFilter(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " + "UNION ALL " + @@ -178,7 +178,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "2,2,Hello", "3,2,Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) @@ -189,7 +189,7 @@ class SqlITCase extends StreamingWithStateTestBase { def testUnionTableWithDataSet(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " + "UNION ALL " + @@ -204,7 +204,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList("Hello", "Hello world") + val expected = List("Hello", "Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -213,7 +213,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear // for sum aggregation ensure that every time the order of each element is consistent env.setParallelism(1) @@ -232,7 +232,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello World,1,7", "Hello World,2,15", "Hello World,3,35", "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21") assertEquals(expected.sorted, StreamITCase.testResults.sorted) @@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) @@ -259,7 +259,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello World,1", "Hello World,2", "Hello World,3", "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") assertEquals(expected.sorted, StreamITCase.testResults.sorted) @@ -270,7 +270,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear // for sum aggregation ensure that every time the order of each element is consistent env.setParallelism(1) @@ -289,7 +289,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello World,7,28", "Hello World,8,36", "Hello World,9,56", "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21") assertEquals(expected.sorted, StreamITCase.testResults.sorted) @@ -300,7 +300,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) @@ -314,7 +314,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList("1", "2", "3", "4", "5", "6", "7", "8", "9") + val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -363,7 +363,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3", "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6", "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12", @@ -420,7 +420,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3", "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6", "Hello,3,3,7", @@ -490,7 +490,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9", "Hello,3,4,9", @@ -562,7 +562,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9", "Hello,3,4,9", @@ -584,7 +584,7 @@ class SqlITCase extends StreamingWithStateTestBase { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) @@ -608,7 +608,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + @@ -661,7 +661,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,2,Hello,2,1,2,2,2", "1,3,Hello world,5,2,2,3,2", "1,1,Hi,6,3,2,3,1", @@ -687,7 +687,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + @@ -731,7 +731,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,2,Hello,2,1,2,2,2", "1,3,Hello world,5,2,2,3,2", "1,1,Hi,6,3,2,3,1", @@ -757,7 +757,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + @@ -793,7 +793,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "2,2,Hello,2,1,2,2,2", "3,5,Hello,7,2,3,5,2", "1,3,Hello,10,3,3,5,2", @@ -812,7 +812,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + @@ -849,7 +849,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "2,2,Hello,2,1,2,2,2", "3,5,Hello,7,2,3,5,2", "1,3,Hello,10,3,3,5,2", @@ -869,7 +869,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + @@ -907,7 +907,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "2,1,Hello,1,1,1,1,1", "1,1,Hello,7,4,1,3,1", "1,2,Hello,7,4,1,3,1", @@ -932,7 +932,7 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + @@ -975,7 +975,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,1,Hello,6,3,2,3,1", "1,2,Hello,6,3,2,3,1", "1,3,Hello world,6,3,2,3,1", @@ -1000,7 +1000,7 @@ class SqlITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t = StreamTestData.get5TupleDataStream(env) .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) @@ -1017,7 +1017,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,0,0", "2,1,1", "2,3,1", @@ -1043,7 +1043,7 @@ class SqlITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t = StreamTestData.get5TupleDataStream(env) .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) @@ -1060,7 +1060,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,0,0", "2,1,1", "2,3,1", @@ -1087,7 +1087,7 @@ class SqlITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t = StreamTestData.get5TupleDataStream(env) .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) @@ -1104,7 +1104,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,0,0", "2,1,0", "2,3,0", @@ -1130,7 +1130,7 @@ class SqlITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) env.setParallelism(1) - StreamITCase.testResults = mutable.MutableList() + StreamITCase.clear val t = StreamTestData.get5TupleDataStream(env) .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) @@ -1146,7 +1146,7 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() - val expected = mutable.MutableList( + val expected = List( "1,0,0", "2,1,0", "2,3,0", http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala index 271e90b..910cbf2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink import org.apache.flink.types.Row import org.junit.Assert.assertEquals import org.junit.Test @@ -38,29 +39,24 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { def testNonKeyedGroupAggregate(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) - env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('a.sum, 'b.sum) - val results = t.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = t.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() - val expected = mutable.MutableList( - "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35", - "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85", - "231,91") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = List("231,91") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } @Test def testGroupAggregate(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) - env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear @@ -68,15 +64,12 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { .groupBy('b) .select('b, 'a.sum) - val results = t.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = t.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) env.execute() - val expected = mutable.MutableList( - "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15", - "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70", - "6,90", "6,111") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } @Test @@ -88,30 +81,22 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .groupBy('b) - .select('a.sum as 'd, 'b) - .groupBy('b, 'd) - .select('b) + .select('a.count as 'cnt, 'b) + .groupBy('cnt) + .select('cnt, 'b.count as 'freq) - val results = t.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() + val results = t.toRetractStream[Row] - val expected = mutable.MutableList( - "1", - "2", "2", - "3", "3", "3", - "4", "4", "4", "4", - "5", "5", "5", "5", "5", - "6", "6", "6", "6", "6", "6") - - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + results.addSink(new RetractingSink) + env.execute() + val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } @Test def testGroupAggregateWithExpression(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) - env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear @@ -119,14 +104,14 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { .groupBy('e, 'b % 3) .select('c.min, 'e, 'a.avg, 'd.count) - val results = t.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) + val results = t.toRetractStream[Row] + results.addSink(new RetractingSink) env.execute() val expected = mutable.MutableList( - "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2", - "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1", - "1,2,3,3", "14,2,5,1") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) + "0,1,1,1", "7,1,4,2", "2,1,3,2", + "3,2,3,3", "1,2,3,3", "14,2,5,1", + "12,3,5,1", "5,3,4,2") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala index ea3ab22..96e5eb5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala @@ -35,7 +35,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'x) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -43,7 +43,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -51,7 +51,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -59,7 +59,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -67,7 +67,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -77,7 +77,7 @@ class OverWindowTest extends TableTestBase { val result = table2 .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -85,7 +85,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding -1.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -93,7 +93,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w) .select('c, 'b.count over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test(expected = classOf[ValidationException]) @@ -103,7 +103,7 @@ class OverWindowTest extends TableTestBase { val result = table .window(Over orderBy 'rowtime preceding 1.minutes as 'w) .select('c, weightedAvg('b, 'a) over 'w) - streamUtil.tEnv.optimize(result.getRelNode) + streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala index 497869d..effde8e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala @@ -25,16 +25,18 @@ import org.junit.Assert._ import scala.collection.mutable import org.apache.flink.streaming.api.functions.sink.RichSinkFunction -import org.apache.flink.table.runtime.types.CRow import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer object StreamITCase { - var testResults = mutable.MutableList.empty[String] + var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String] + var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String] def clear = { StreamITCase.testResults.clear() + StreamITCase.retractedResults.clear() } def compareWithList(expected: java.util.List[String]): Unit = { @@ -49,4 +51,33 @@ object StreamITCase { } } } + + final class RetractMessagesSink extends RichSinkFunction[(Boolean, Row)]() { + def invoke(v: (Boolean, Row)) { + testResults.synchronized { + testResults += (if (v._1) "+" else "-") + v._2 + } + } + } + + final class RetractingSink() extends RichSinkFunction[(Boolean, Row)] { + def invoke(v: (Boolean, Row)) { + retractedResults.synchronized { + val value = v._2.toString + if (v._1) { + retractedResults += value + } else { + val idx = retractedResults.indexOf(value) + if (idx >= 0) { + retractedResults.remove(idx) + } else { + throw new RuntimeException("Tried to retract a value that wasn't added first. " + + "This is probably an incorrectly implemented test. " + + "Try to set the parallelism of the sink to 1.") + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala index 580029f..861f70e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala @@ -292,7 +292,7 @@ class StreamTableTestForRetractionUtil extends StreamTableTestUtil { def verifyTableTrait(resultTable: Table, expected: String): Unit = { val relNode = resultTable.getRelNode - val optimized = tEnv.optimize(relNode) + val optimized = tEnv.optimize(relNode, updatesAsRetraction = false) val actual = TraitUtil.toString(optimized) assertEquals( expected.split("\n").map(_.trim).mkString("\n"), http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala new file mode 100644 index 0000000..2dfb658 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData +import org.apache.flink.types.Row + +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[TableException]) + def testAppendSinkOnUpdatingTable(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) + + t.groupBy('text) + .select('text, 'id.count, 'num.sum) + .writeToSink(new TestAppendSink) + + // must fail because table is not append-only + env.execute() + } + + @Test + def testAppendSinkOnAppendTable(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w) + .select('w.end, 'id.count, 'num.sum) + .writeToSink(new TestAppendSink) + + env.execute() + + val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted + val expected = List( + "1970-01-01 00:00:00.005,4,8", + "1970-01-01 00:00:00.01,5,18", + "1970-01-01 00:00:00.015,5,24", + "1970-01-01 00:00:00.02,5,29", + "1970-01-01 00:00:00.025,2,12") + .sorted + assertEquals(expected, result) + } + + @Test + def testRetractSinkOnUpdatingTable(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text) + + t.select('id, 'num, 'text.charLength() as 'len) + .groupBy('len) + .select('len, 'id.count, 'num.sum) + .writeToSink(new TestRetractSink) + + env.execute() + val results = RowCollector.getAndClearValues + + val retracted = restractResults(results).sorted + val expected = List( + "2,1,1", + "5,1,2", + "11,1,2", + "25,1,3", + "10,7,39", + "14,1,3", + "9,9,41").sorted + assertEquals(expected, retracted) + + } + + @Test + def testRetractSinkOnAppendTable(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w) + .select('w.end, 'id.count, 'num.sum) + .writeToSink(new TestRetractSink) + + env.execute() + val results = RowCollector.getAndClearValues + + assertFalse( + "Received retraction messages for append only table", + results.exists(!_.f0)) + + val retracted = restractResults(results).sorted + val expected = List( + "1970-01-01 00:00:00.005,4,8", + "1970-01-01 00:00:00.01,5,18", + "1970-01-01 00:00:00.015,5,24", + "1970-01-01 00:00:00.02,5,29", + "1970-01-01 00:00:00.025,2,12") + .sorted + assertEquals(expected, retracted) + + } + + @Test + def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text) + + t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue) + .groupBy('len, 'cTrue) + .select('len, 'id.count as 'cnt, 'cTrue) + .groupBy('cnt, 'cTrue) + .select('cnt, 'len.count, 'cTrue) + .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false)) + + env.execute() + val results = RowCollector.getAndClearValues + + assertTrue( + "Results must include delete messages", + results.exists(_.f0 == false) + ) + + val retracted = upsertResults(results, Array(0, 2)).sorted + val expected = List( + "1,5,true", + "7,1,true", + "9,1,true").sorted + assertEquals(expected, retracted) + + } + + @Test(expected = classOf[TableException]) + def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text) + + t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue) + .groupBy('len, 'cTrue) + .select('len, 'id.count, 'num.sum) + .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false)) + + // must fail because table is updating table without full key + env.execute() + } + + @Test + def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'num) + .select('num, 'w.end as 'wend, 'id.count) + .writeToSink(new TestUpsertSink(Array("wend", "num"), true)) + + env.execute() + val results = RowCollector.getAndClearValues + + assertFalse( + "Received retraction messages for append only table", + results.exists(!_.f0)) + + val retracted = upsertResults(results, Array(0, 1, 2)).sorted + val expected = List( + "1,1970-01-01 00:00:00.005,1", + "2,1970-01-01 00:00:00.005,2", + "3,1970-01-01 00:00:00.005,1", + "3,1970-01-01 00:00:00.01,2", + "4,1970-01-01 00:00:00.01,3", + "4,1970-01-01 00:00:00.015,1", + "5,1970-01-01 00:00:00.015,4", + "5,1970-01-01 00:00:00.02,1", + "6,1970-01-01 00:00:00.02,4", + "6,1970-01-01 00:00:00.025,2").sorted + assertEquals(expected, retracted) + } + + @Test + def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'num) + .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count) + .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true)) + + env.execute() + val results = RowCollector.getAndClearValues + + assertFalse( + "Received retraction messages for append only table", + results.exists(!_.f0)) + + val retracted = upsertResults(results, Array(0, 1, 2)).sorted + val expected = List( + "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1", + "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2", + "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1", + "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2", + "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3", + "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1", + "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4", + "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1", + "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4", + "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted + assertEquals(expected, retracted) + } + + @Test + def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'num) + .select('w.end as 'wend, 'id.count as 'cnt) + .writeToSink(new TestUpsertSink(null, true)) + + env.execute() + val results = RowCollector.getAndClearValues + + assertFalse( + "Received retraction messages for append only table", + results.exists(!_.f0)) + + val retracted = results.map(_.f1.toString).sorted + val expected = List( + "1970-01-01 00:00:00.005,1", + "1970-01-01 00:00:00.005,2", + "1970-01-01 00:00:00.005,1", + "1970-01-01 00:00:00.01,2", + "1970-01-01 00:00:00.01,3", + "1970-01-01 00:00:00.015,1", + "1970-01-01 00:00:00.015,4", + "1970-01-01 00:00:00.02,1", + "1970-01-01 00:00:00.02,4", + "1970-01-01 00:00:00.025,2").sorted + assertEquals(expected, retracted) + } + + @Test + def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.get3TupleDataStream(env) + .assignAscendingTimestamps(_._1.toLong) + .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime) + + t.window(Tumble over 5.millis on 'rowtime as 'w) + .groupBy('w, 'num) + .select('num, 'id.count as 'cnt) + .writeToSink(new TestUpsertSink(null, true)) + + env.execute() + val results = RowCollector.getAndClearValues + + assertFalse( + "Received retraction messages for append only table", + results.exists(!_.f0)) + + val retracted = results.map(_.f1.toString).sorted + val expected = List( + "1,1", + "2,2", + "3,1", + "3,2", + "4,3", + "4,1", + "5,4", + "5,1", + "6,4", + "6,2").sorted + assertEquals(expected, retracted) + } + + /** Converts a list of retraction messages into a list of final results. */ + private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = { + + val retracted = results + .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) => + val cnt = m.getOrElse(v.f1.toString, 0) + if (v.f0) { + m + (v.f1.toString -> (cnt + 1)) + } else { + m + (v.f1.toString -> (cnt - 1)) + } + }.filter{ case (_, c: Int) => c != 0 } + + assertFalse( + "Received retracted rows which have not been accumulated.", + retracted.exists{ case (_, c: Int) => c < 0}) + + retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList + } + + /** Converts a list of upsert messages into a list of final results. */ + private def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = { + + def getKeys(r: Row): List[String] = + keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k) + + val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) => + val key = getKeys(r.f1).mkString("") + if (r.f0) { + o + (key -> r.f1.toString) + } else { + o - key + } + } + + upserted.values.toList + } + +} + +private class TestAppendSink extends AppendStreamTableSink[Row] { + + var fNames: Array[String] = _ + var fTypes: Array[TypeInformation[_]] = _ + + override def emitDataStream(s: DataStream[Row]): Unit = { + s.map( + new MapFunction[Row, JTuple2[JBool, Row]] { + override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value) + }) + .addSink(new RowSink) + } + + override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) + + override def getFieldNames: Array[String] = fNames + + override def getFieldTypes: Array[TypeInformation[_]] = fTypes + + override def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { + val copy = new TestAppendSink + copy.fNames = fieldNames + copy.fTypes = fieldTypes + copy + } +} + +private class TestRetractSink extends RetractStreamTableSink[Row] { + + var fNames: Array[String] = _ + var fTypes: Array[TypeInformation[_]] = _ + + override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = { + s.addSink(new RowSink) + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) + + override def getFieldNames: Array[String] = fNames + + override def getFieldTypes: Array[TypeInformation[_]] = fTypes + + override def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = { + val copy = new TestRetractSink + copy.fNames = fieldNames + copy.fTypes = fieldTypes + copy + } + +} + +private class TestUpsertSink( + expectedKeys: Array[String], + expectedIsAppendOnly: Boolean) + extends UpsertStreamTableSink[Row] { + + var fNames: Array[String] = _ + var fTypes: Array[TypeInformation[_]] = _ + + override def setKeyFields(keys: Array[String]): Unit = + if (keys != null) { + assertEquals("Provided key fields do not match expected keys", + expectedKeys.sorted.mkString(","), + keys.sorted.mkString(",")) + } else { + assertNull("Provided key fields should not be null.", expectedKeys) + } + + override def setIsAppendOnly(isAppendOnly: Boolean): Unit = + assertEquals( + "Provided isAppendOnly does not match expected isAppendOnly", + expectedIsAppendOnly, + isAppendOnly) + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) + + override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = { + s.addSink(new RowSink) + } + + override def getFieldNames: Array[String] = fNames + + override def getFieldTypes: Array[TypeInformation[_]] = fTypes + + override def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = { + val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly) + copy.fNames = fieldNames + copy.fTypes = fieldTypes + copy + } +} + +class RowSink extends SinkFunction[JTuple2[JBool, Row]] { + override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value) +} + +object RowCollector { + private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] = + new mutable.ArrayBuffer[JTuple2[JBool, Row]]() + + def addValue(value: JTuple2[JBool, Row]): Unit = { + sink.synchronized { + sink += value + } + } + + def getAndClearValues: List[JTuple2[JBool, Row]] = { + val out = sink.toList + sink.clear() + out + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 79e957a..8626b07 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -39,9 +39,4 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ??? - override protected def getConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], - logicalRowType: RelDataType, - requestedTypeInfo: TypeInformation[OUT], - functionName: String) = ??? } http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 73bc2f8..0e6d461 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -211,7 +211,7 @@ case class StreamTableTestUtil() extends TableTestUtil { def verifyTable(resultTable: Table, expected: String): Unit = { val relNode = resultTable.getRelNode - val optimized = tEnv.optimize(relNode) + val optimized = tEnv.optimize(relNode, updatesAsRetraction = false) val actual = RelOptUtil.toString(optimized) assertEquals( expected.split("\n").map(_.trim).mkString("\n"), @@ -221,7 +221,7 @@ case class StreamTableTestUtil() extends TableTestUtil { // the print methods are for debugging purposes only def printTable(resultTable: Table): Unit = { val relNode = resultTable.getRelNode - val optimized = tEnv.optimize(relNode) + val optimized = tEnv.optimize(relNode, updatesAsRetraction = false) println(RelOptUtil.toString(optimized)) }