[FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.

This closes #3850.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd799c74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd799c74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd799c74

Branch: refs/heads/release-1.3
Commit: dd799c746cc464550222b9b126e3f60c5259df9f
Parents: 34cff35
Author: Hequn Cheng <chenghe...@gmail.com>
Authored: Mon May 8 20:55:51 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue May 9 18:53:14 2017 +0200

----------------------------------------------------------------------
 .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala     | 2 +-
 .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala     | 2 +-
 .../table/plan/nodes/datastream/DataStreamOverAggregate.scala      | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 18f1fc8..506c0cb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -115,7 +115,7 @@ class DataStreamGroupAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val aggString = aggregationToString(
       inputSchema.logicalType,

http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c38e5af..ef207b0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -142,7 +142,7 @@ class DataStreamGroupWindowAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val needMerge = window match {
       case SessionGroupWindow(_, _, _) => true

http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index e823cd6..4061242 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -116,7 +116,7 @@ class DataStreamOverAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val timeType = schema.logicalType
       .getFieldList

Reply via email to