This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 9984b58 [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner 9984b58 is described below commit 9984b5869b2f35b1fb8ce75e09ad73574d9f91ec Author: Leonard Xu <xbjt...@gmail.com> AuthorDate: Thu Jan 28 12:01:39 2021 +0800 [FLINK-21013][table-planner-blink] Ingest row time into StreamRecord in Blink planner This closes #14785. --- .../planner/codegen/OperatorCodeGenerator.scala | 3 + .../table/planner/codegen/SinkCodeGenerator.scala | 32 ++- .../physical/stream/StreamExecLegacySink.scala | 8 +- .../runtime/stream/table/TableSinkITCase.scala | 94 +------ .../stream/table/TableToDataStreamITCase.scala | 308 +++++++++++++++++++++ 5 files changed, 346 insertions(+), 99 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala index a846592..8e3a9ed 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala @@ -256,4 +256,7 @@ object OperatorCodeGenerator extends Logging { def generateCollect(emit: String): String = s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit));" + + def generateCollectWithTimestamp(emit: String, timestampTerm: String): String = + s"$DEFAULT_OPERATOR_COLLECTOR_TERM.collect($OUT_ELEMENT.replace($emit, $timestampTerm));" } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala index f46bd7c..1bd473d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.data.util.RowDataUtil import org.apache.flink.table.data.{GenericRowData, RowData} import org.apache.flink.table.planner.codegen.CodeGenUtils.genToExternal import org.apache.flink.table.planner.codegen.GeneratedExpression.NO_CODE -import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect +import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, generateCollectWithTimestamp} import org.apache.flink.table.planner.sinks.TableSinkUtils import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo @@ -48,7 +48,8 @@ object SinkCodeGenerator { inputRowType: RowType, sink: TableSink[_], withChangeFlag: Boolean, - operatorName: String): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = { + operatorName: String, + rowtimeIndex: Int = -1): (CodeGenOperatorFactory[OUT], TypeInformation[OUT]) = { val physicalOutputType = TableSinkUtils.inferSinkPhysicalDataType( sink.getConsumedDataType, @@ -69,6 +70,7 @@ object SinkCodeGenerator { val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM var afterIndexModify = inputTerm + var modifiedRowtimeIndex = rowtimeIndex val fieldIndexProcessCode = physicalTypeInfo match { case pojo: PojoTypeInfo[_] => val mapping = pojo.getFieldNames.map { name => @@ -88,6 +90,10 @@ object SinkCodeGenerator { (0 until pojo.getArity) .map(pojo.getTypeAt) .map(fromTypeInfoToLogicalType): _*) + if (rowtimeIndex >= 0) { + modifiedRowtimeIndex = outputRowType.getFieldIndex( + inputRowType.getFieldNames.get(rowtimeIndex)) + } val conversion = resultGenerator.generateConverterResultExpression( outputRowType, classOf[GenericRowData]) @@ -114,7 +120,7 @@ object SinkCodeGenerator { |$tupleClass $resultTerm = new $tupleClass(); |$resultTerm.setField($flagResultTerm, 0); |$resultTerm.setField($outTerm, 1); - |${generateCollect(resultTerm)} + |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)} """.stripMargin } else { // Scala Case Class @@ -133,11 +139,11 @@ object SinkCodeGenerator { |$fieldsTerm[0] = $flagResultTerm; |$fieldsTerm[1] = $outTerm; |$tupleClass $resultTerm = ($tupleClass) $serializerTerm.createInstance($fieldsTerm); - |${generateCollect(resultTerm)} + |${generateCollectCode(afterIndexModify, resultTerm, modifiedRowtimeIndex)} """.stripMargin } } else { - generateCollect(outTerm) + generateCollectCode(afterIndexModify, outTerm, modifiedRowtimeIndex) } val generated = OperatorCodeGenerator.generateOneInputStreamOperator[RowData, OUT]( @@ -150,4 +156,20 @@ object SinkCodeGenerator { inputRowType) (new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]]) } + + private def generateCollectCode( + afterIndexModify: String, + resultTerm: String, + modifiedRowtimeIndex: Int): String = { + if (modifiedRowtimeIndex >= 0) { + val rowtimeTerm = CodeGenUtils.newName("rowtime") + s""" + | Long $rowtimeTerm = + | $afterIndexModify.getTimestamp($modifiedRowtimeIndex, 3).getMillisecond(); + | ${generateCollectWithTimestamp(resultTerm, rowtimeTerm)} + """.stripMargin + } else { + generateCollect(resultTerm) + } + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala index f8d5006..5aed5ef 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLegacySink.scala @@ -188,13 +188,19 @@ class StreamExecLegacySink[T]( if (CodeGenUtils.isInternalClass(resultDataType)) { parTransformation.asInstanceOf[Transformation[T]] } else { + val rowtimeIndex = if (rowtimeFields.size == 1) { + rowtimeFields.head.getIndex + } else { + -1 + } val (converterOperator, outputTypeInfo) = generateRowConverterOperator[T]( CodeGeneratorContext(config), config, convType.asInstanceOf[RowDataTypeInfo].toRowType, sink, withChangeFlag, - "SinkConversion" + "SinkConversion", + rowtimeIndex ) new OneInputTransformation( parTransformation, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index 6bc2bc0..0a654c3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory -import org.apache.flink.table.planner.factories.TestValuesTableFactory.{TestSinkContextTableSink, changelogRow} +import org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow import org.apache.flink.table.planner.runtime.utils.StreamingTestBase import org.apache.flink.table.planner.runtime.utils.TestData.{nullData4, smallTupleData3, tupleData3, tupleData5} import org.apache.flink.util.ExceptionUtils @@ -31,9 +31,6 @@ import org.junit.Test import java.lang.{Long => JLong} import java.math.{BigDecimal => JBigDecimal} -import java.sql.Timestamp -import java.time.{LocalDateTime, OffsetDateTime, ZoneId, ZoneOffset} -import java.util.TimeZone import scala.collection.JavaConversions._ @@ -622,93 +619,4 @@ class TableSinkITCase extends StreamingTestBase { val expected = List("book,1,12", "book,4,11", "fruit,3,44") assertEquals(expected.sorted, result.sorted) } - - @Test - def testSinkContext(): Unit = { - val data = List( - rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d), - rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d), - rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d), - rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d), - rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d), - rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d), - rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d)) - - val dataId: String = TestValuesTableFactory.registerData(data) - - val sourceDDL = - s""" - |CREATE TABLE src ( - | log_ts STRING, - | ts TIMESTAMP(3), - | a INT, - | b DOUBLE, - | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND - |) WITH ( - | 'connector' = 'values', - | 'data-id' = '$dataId' - |) - """.stripMargin - - val sinkDDL = - s""" - |CREATE TABLE sink ( - | log_ts STRING, - | ts TIMESTAMP(3), - | a INT, - | b DOUBLE - |) WITH ( - | 'connector' = 'values', - | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}' - |) - """.stripMargin - - tEnv.executeSql(sourceDDL) - tEnv.executeSql(sinkDDL) - - //--------------------------------------------------------------------------------------- - // Verify writing out a source directly with the rowtime attribute - //--------------------------------------------------------------------------------------- - - execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src") - - val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000) - assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted) - - val sinkDDL2 = - s""" - |CREATE TABLE sink2 ( - | window_rowtime TIMESTAMP(3), - | b DOUBLE - |) WITH ( - | 'connector' = 'values', - | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}' - |) - """.stripMargin - tEnv.executeSql(sinkDDL2) - - //--------------------------------------------------------------------------------------- - // Verify writing out with additional operator to generate a new rowtime attribute - //--------------------------------------------------------------------------------------- - - execInsertSqlAndWaitResult( - """ - |INSERT INTO sink2 - |SELECT - | TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND), - | SUM(b) - |FROM src - |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND) - |""".stripMargin - ) - - val expected2 = List(4999, 9999, 19999) - assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted) - } - - // ------------------------------------------------------------------------------------------ - - private def localDateTime(epochSecond: Long): LocalDateTime = { - LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC) - } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala new file mode 100644 index 0000000..cd92266 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.table + +import java.time.{LocalDateTime, ZoneOffset} +import java.util.TimeZone + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.factories.TestValuesTableFactory.TestSinkContextTableSink +import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingRetractSink} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConversions._ + +/** + * Test the conversion between [[Table]] and [[DataStream]] should + * not loss row time attribute. + */ +final class TableToDataStreamITCase extends StreamingTestBase { + + @Test + def testHasRowtimeFromTableToAppendStream(): Unit = { + val data = List( + rowOf(localDateTime(1L), "A"), + rowOf(localDateTime(2L), "B"), + rowOf(localDateTime(3L), "C"), + rowOf(localDateTime(4L), "D"), + rowOf(localDateTime(7L), "E")) + + val dataId: String = TestValuesTableFactory.registerData(data) + + val sourceDDL = + s""" + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a STRING, + | WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + """.stripMargin + + tEnv.executeSql(sourceDDL) + val dataStream = tEnv.sqlQuery("SELECT a, ts FROM src").toAppendStream[Row] + + val expected = List( + "A,1970-01-01T00:00:01,1000", + "B,1970-01-01T00:00:02,2000", + "C,1970-01-01T00:00:03,3000", + "D,1970-01-01T00:00:04,4000", + "E,1970-01-01T00:00:07,7000") + + val sink = new StringWithTimestampSink[Row] + dataStream.addSink(sink) + env.execute("TableToAppendStream") + assertEquals(expected, sink.getResults.sorted) + + } + + @Test + def testHasRowtimeFromTableToRetractStream(): Unit = { + val data = List( + rowOf(localDateTime(1L), "A"), + rowOf(localDateTime(2L), "A"), + rowOf(localDateTime(3L), "C"), + rowOf(localDateTime(4L), "D"), + rowOf(localDateTime(7L), "E")) + + val dataId: String = TestValuesTableFactory.registerData(data) + + val sourceDDL = + s""" + |CREATE TABLE src ( + | ts TIMESTAMP(3), + | a STRING, + | WATERMARK FOR ts AS ts - INTERVAL '0.005' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + """.stripMargin + + tEnv.executeSql(sourceDDL) + val dataStream = tEnv.sqlQuery( + """ + |SELECT a, ts + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts DESC) as rowNum + | FROM src + |) + |WHERE rowNum = 1 + """.stripMargin + ).toRetractStream[Row] + + val sink = new StringWithTimestampRetractSink[Row] + dataStream.addSink(sink) + env.execute("TableToRetractStream") + + val expected = List( + "A,1970-01-01T00:00:02,2000", + "C,1970-01-01T00:00:03,3000", + "D,1970-01-01T00:00:04,4000", + "E,1970-01-01T00:00:07,7000") + assertEquals(expected, sink.getRetractResults.sorted) + + val expectedRetract = List( + "(true,A,1970-01-01T00:00:01,1000)", + "(false,A,1970-01-01T00:00:01,1000)", + "(true,A,1970-01-01T00:00:02,2000)", + "(true,C,1970-01-01T00:00:03,3000)", + "(true,D,1970-01-01T00:00:04,4000)", + "(true,E,1970-01-01T00:00:07,7000)") + assertEquals(expectedRetract.sorted, sink.getRawResults.sorted) + } + + @Test + def testHasRowtimeFromDataStreamToTableBackDataStream(): Unit = { + val data = Seq( + (1L, "A"), + (2L, "B"), + (3L, "C"), + (4L, "D"), + (7L, "E")) + + val ds1 = env.fromCollection(data) + // second to millisecond + .assignAscendingTimestamps(_._1 * 1000L) + val table = ds1.toTable(tEnv, 'ts, 'a, 'rowtime.rowtime) + tEnv.registerTable("t1", table) + + val ds2 = tEnv.sqlQuery( + """ + | SELECT CONCAT(a, '_'), ts, rowtime + | FROM t1 + """.stripMargin + ).toAppendStream[Row] + + val expected = List( + "A_,1,1970-01-01T00:00:01,1000", + "B_,2,1970-01-01T00:00:02,2000", + "C_,3,1970-01-01T00:00:03,3000", + "D_,4,1970-01-01T00:00:04,4000", + "E_,7,1970-01-01T00:00:07,7000") + + val sink = new StringWithTimestampSink[Row] + ds2.addSink(sink) + env.execute("DataStreamToTableBackDataStream") + assertEquals(expected, sink.getResults.sorted) + } + + @Test + def testHasRowtimeFromTableToExternalSystem(): Unit = { + val data = List( + rowOf("1970-01-01 00:00:00.001", localDateTime(1L), 1, 1d), + rowOf("1970-01-01 00:00:00.002", localDateTime(2L), 1, 2d), + rowOf("1970-01-01 00:00:00.003", localDateTime(3L), 1, 2d), + rowOf("1970-01-01 00:00:00.004", localDateTime(4L), 1, 5d), + rowOf("1970-01-01 00:00:00.007", localDateTime(7L), 1, 3d), + rowOf("1970-01-01 00:00:00.008", localDateTime(8L), 1, 3d), + rowOf("1970-01-01 00:00:00.016", localDateTime(16L), 1, 4d)) + + val dataId: String = TestValuesTableFactory.registerData(data) + + val sourceDDL = + s""" + |CREATE TABLE src ( + | log_ts STRING, + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE, + | WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$dataId' + |) + """.stripMargin + + val sinkDDL = + s""" + |CREATE TABLE sink ( + | log_ts STRING, + | ts TIMESTAMP(3), + | a INT, + | b DOUBLE + |) WITH ( + | 'connector' = 'values', + | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}' + |) + """.stripMargin + + tEnv.executeSql(sourceDDL) + tEnv.executeSql(sinkDDL) + + //--------------------------------------------------------------------------------------- + // Verify writing out a source directly with the rowtime attribute + //--------------------------------------------------------------------------------------- + + execInsertSqlAndWaitResult("INSERT INTO sink SELECT * FROM src") + + val expected = List(1000, 2000, 3000, 4000, 7000, 8000, 16000) + assertEquals(expected.sorted, TestSinkContextTableSink.ROWTIMES.sorted) + + val sinkDDL2 = + s""" + |CREATE TABLE sink2 ( + | window_rowtime TIMESTAMP(3), + | b DOUBLE + |) WITH ( + | 'connector' = 'values', + | 'table-sink-class' = '${classOf[TestSinkContextTableSink].getName}' + |) + """.stripMargin + tEnv.executeSql(sinkDDL2) + + //--------------------------------------------------------------------------------------- + // Verify writing out with additional operator to generate a new rowtime attribute + //--------------------------------------------------------------------------------------- + + execInsertSqlAndWaitResult( + """ + |INSERT INTO sink2 + |SELECT + | TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND), + | SUM(b) + |FROM src + |GROUP BY TUMBLE(ts, INTERVAL '5' SECOND) + |""".stripMargin + ) + + val expected2 = List(4999, 9999, 19999) + assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted) + } + + private def localDateTime(epochSecond: Long): LocalDateTime = { + LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC) + } +} + +/** + * Append test Sink that outputs record with timestamp. + */ +final class StringWithTimestampSink[T] extends AbstractExactlyOnceSink[T]() { + + override def invoke(value: T, context: SinkFunction.Context[_]) { + localResults += s"${value.toString},${context.timestamp()}" + } + + override def getResults: List[String] = super.getResults +} + +/** + * Retract test Sink that outputs record with timestamp. + */ +final class StringWithTimestampRetractSink[T](tz: TimeZone) extends + TestingRetractSink(tz) { + + def this() { + this(TimeZone.getTimeZone("UTC")) + } + + override def invoke(v: (Boolean, Row), context: SinkFunction.Context[_]): Unit = { + this.synchronized { + val rowString = s"${TestSinkUtil.rowToString(v._2, tz)},${context.timestamp()}" + + val tupleString = "(" + v._1.toString + "," + rowString + ")" + localResults += tupleString + if (v._1) { + localRetractResults += rowString + } else { + val index = localRetractResults.indexOf(rowString) + if (index >= 0) { + localRetractResults.remove(index) + } else { + throw new RuntimeException("Tried to retract a value that wasn't added first. " + + "This is probably an incorrectly implemented test. " + + "Try to set the parallelism of the sink to 1.") + } + } + } + } + + override def getResults: List[String] = super.getResults +}