Repository: flink Updated Branches: refs/heads/master 51b5b53c7 -> 691c48a14
[FLINK-8096] [table] Fix time attribute materialization when writing to TableSink This closes #5025. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/691c48a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/691c48a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/691c48a1 Branch: refs/heads/master Commit: 691c48a14a138e4ff3ae330b7c9cfc0e596a9afa Parents: 51b5b53 Author: Dian Fu <fudian...@alibaba-inc.com> Authored: Fri Nov 17 10:53:31 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Mon Nov 20 10:52:27 2017 +0100 ---------------------------------------------------------------------- .../table/api/StreamTableEnvironment.scala | 39 ++++++++++++-------- .../runtime/stream/TimeAttributesITCase.scala | 26 ++++++++++++- 2 files changed, 49 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/691c48a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index e80acca..920da2e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType + val resultType = getResultType(table.getRelNode, optimizedPlan) // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = translate( optimizedPlan, - table.getRelNode.getRowType, + resultType, streamQueryConfig, withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. @@ -254,11 +255,12 @@ abstract class StreamTableEnvironment( "AppendStreamTableSink requires that Table has only insert changes.") } val outputType = sink.getOutputType + val resultType = getResultType(table.getRelNode, optimizedPlan) // translate the Table into a DataStream and provide the type that the TableSink expects. val result: DataStream[T] = translate( optimizedPlan, - table.getRelNode.getRowType, + resultType, streamQueryConfig, withChangeFlag = false)(outputType) // Give the DataStream to the TableSink to emit it. @@ -727,19 +729,7 @@ abstract class StreamTableEnvironment( val relNode = table.getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) - // zip original field names with optimized field types - val fieldTypes = relNode.getRowType.getFieldList.asScala - .zip(dataStreamPlan.getRowType.getFieldList.asScala) - // get name of original plan and type of optimized plan - .map(x => (x._1.getName, x._2.getType)) - // add field indexes - .zipWithIndex - // build new field types - .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2)) - - // build a record type from list of field types - val rowType = new RelRecordType( - fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava) + val rowType = getResultType(relNode, dataStreamPlan) translate(dataStreamPlan, rowType, queryConfig, withChangeFlag) } @@ -852,6 +842,25 @@ abstract class StreamTableEnvironment( } /** + * Returns the record type of the optimized plan with field names of the logical plan. + */ + private def getResultType(originRelNode: RelNode, optimizedPlan: RelNode): RelRecordType = { + // zip original field names with optimized field types + val fieldTypes = originRelNode.getRowType.getFieldList.asScala + .zip(optimizedPlan.getRowType.getFieldList.asScala) + // get name of original plan and type of optimized plan + .map(x => (x._1.getName, x._2.getType)) + // add field indexes + .zipWithIndex + // build new field types + .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2)) + + // build a record type from list of field types + new RelRecordType( + fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava) + } + + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. * http://git-wip-us.apache.org/repos/asf/flink/blob/691c48a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 5086601..a301354 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -36,7 +36,7 @@ import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} import org.apache.flink.table.runtime.utils.StreamITCase -import org.apache.flink.table.utils.TestTableSourceWithTime +import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime} import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test @@ -179,6 +179,30 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } @Test + def testTableSink(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSinkUtil.clear + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + .filter('rowtime.cast(Types.LONG) > 4) + .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) + .writeToSink(new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink) + + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0") + assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) + } + + @Test def testTableFunction(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)