JingsongLi commented on a change in pull request #9195:
[FLINK-13289][table-planner-blink] Blink-planner should setKeyFields to upsert
table sink
URL: https://github.com/apache/flink/pull/9195#discussion_r307103615
##
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
##
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3,
tupleData3, tupleData5}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.sinks._
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.File
+import java.lang.{Boolean => JBool}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class TableSinkITCase extends AbstractTestBase {
+
+ @Test
+ def testInsertIntoRegisteredTableSink(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.getConfig.enableObjectReuse()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+MemoryTableSourceSinkUtil.clear()
+
+val input = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(r => r._2)
+val fieldNames = Array("d", "e", "t")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING,
Types.SQL_TIMESTAMP, Types.LONG)
+val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+tEnv.registerTableSink("targetTable", sink.configure(fieldNames,
fieldTypes))
+
+input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ .where('a < 3 || 'a > 19)
+ .select('c, 't, 'b)
+ .insertInto("targetTable")
+env.execute()
+
+val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001,1",
+ "Hello,1970-01-01 00:00:00.002,2",
+ "Comment#14,1970-01-01 00:00:00.006,6",
+ "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+
TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava,
expected)
+ }
+
+ @Test
+ def testStreamTableSink(): Unit = {
+
+val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+tmpFile.deleteOnExit()
+val path = tmpFile.toURI.toString
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.getConfig.enableObjectReuse()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+val tEnv = StreamTableEnvironment.create(env)
+env.setParallelism(4)
+
+tEnv.registerTableSink(
+ "csvSink",
+ new CsvTableSink(path).configure(
+Array[String]("c", "b"),
+Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+
+val input = env.fromCollection(tupleData3)
+ .assignAscendingTimestamps(_._2)
+ .map(x => x).setParallelism(4) // increase DOP to 4
+
+input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .insertInto("csvSink")
+
+env.execute()
+
+val expected = Seq(
+ "Hi,1970-01-01 00:00:00.001",
+ "Hello,1970-01-01 00:00:00.002",
+ "Hello world,1970-01-01 00:00:00.002",
+ "Hello world, how are you?,1970-01-01 00:00:00.003",
+ "Comment#12,1970-01-01 00:00:00.006",
+ "Comment#13,1970-01-01 00:00:00.006",
+ "Comment#14,1970-01-01 00:00:00.006",
+