Repository: flink
Updated Branches:
  refs/heads/master 51b5b53c7 -> 691c48a14


[FLINK-8096] [table] Fix time attribute materialization when writing to 
TableSink

This closes #5025.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/691c48a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/691c48a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/691c48a1

Branch: refs/heads/master
Commit: 691c48a14a138e4ff3ae330b7c9cfc0e596a9afa
Parents: 51b5b53
Author: Dian Fu <fudian...@alibaba-inc.com>
Authored: Fri Nov 17 10:53:31 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Mon Nov 20 10:52:27 2017 +0100

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 39 ++++++++++++--------
 .../runtime/stream/TimeAttributesITCase.scala   | 26 ++++++++++++-
 2 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/691c48a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index e80acca..920da2e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
             "UpsertStreamTableSink requires that Table has a full primary keys 
if it is updated.")
         }
         val outputType = sink.getOutputType
+        val resultType = getResultType(table.getRelNode, optimizedPlan)
         // translate the Table into a DataStream and provide the type that the 
TableSink expects.
         val result: DataStream[T] =
           translate(
             optimizedPlan,
-            table.getRelNode.getRowType,
+            resultType,
             streamQueryConfig,
             withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
@@ -254,11 +255,12 @@ abstract class StreamTableEnvironment(
             "AppendStreamTableSink requires that Table has only insert 
changes.")
         }
         val outputType = sink.getOutputType
+        val resultType = getResultType(table.getRelNode, optimizedPlan)
         // translate the Table into a DataStream and provide the type that the 
TableSink expects.
         val result: DataStream[T] =
           translate(
             optimizedPlan,
-            table.getRelNode.getRowType,
+            resultType,
             streamQueryConfig,
             withChangeFlag = false)(outputType)
         // Give the DataStream to the TableSink to emit it.
@@ -727,19 +729,7 @@ abstract class StreamTableEnvironment(
     val relNode = table.getRelNode
     val dataStreamPlan = optimize(relNode, updatesAsRetraction)
 
-    // zip original field names with optimized field types
-    val fieldTypes = relNode.getRowType.getFieldList.asScala
-      .zip(dataStreamPlan.getRowType.getFieldList.asScala)
-      // get name of original plan and type of optimized plan
-      .map(x => (x._1.getName, x._2.getType))
-      // add field indexes
-      .zipWithIndex
-      // build new field types
-      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
-
-    // build a record type from list of field types
-    val rowType = new RelRecordType(
-      fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava)
+    val rowType = getResultType(relNode, dataStreamPlan)
 
     translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
   }
@@ -852,6 +842,25 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Returns the record type of the optimized plan with field names of the 
logical plan.
+    */
+  private def getResultType(originRelNode: RelNode, optimizedPlan: RelNode): 
RelRecordType = {
+    // zip original field names with optimized field types
+    val fieldTypes = originRelNode.getRowType.getFieldList.asScala
+      .zip(optimizedPlan.getRowType.getFieldList.asScala)
+      // get name of original plan and type of optimized plan
+      .map(x => (x._1.getName, x._2.getType))
+      // add field indexes
+      .zipWithIndex
+      // build new field types
+      .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
+
+    // build a record type from list of field types
+    new RelRecordType(
+      fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava)
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/691c48a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 5086601..a301354 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.api.{TableEnvironment, 
TableSchema, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import 
org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark,
 TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.StreamITCase
-import org.apache.flink.table.utils.TestTableSourceWithTime
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, 
TestTableSourceWithTime}
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit.Test
@@ -179,6 +179,30 @@ class TimeAttributesITCase extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test
+  def testTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 
'string)
+      .filter('rowtime.cast(Types.LONG) > 4)
+      .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 
'rowtime.ceil(TimeIntervalUnit.DAY))
+      .writeToSink(new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink)
+
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+      "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+      "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
+
+  @Test
   def testTableFunction(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Reply via email to