[FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.
This closes #3850. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5ddbe5c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5ddbe5c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5ddbe5c Branch: refs/heads/master Commit: b5ddbe5c360003b210a1212e54e6c50b8af538fa Parents: e2cb221 Author: Hequn Cheng <chenghe...@gmail.com> Authored: Mon May 8 20:55:51 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 9 18:50:20 2017 +0200 ---------------------------------------------------------------------- .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala | 2 +- .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 2 +- .../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 18f1fc8..506c0cb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -115,7 +115,7 @@ class DataStreamGroupAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val aggString = aggregationToString( inputSchema.logicalType, http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c38e5af..ef207b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -142,7 +142,7 @@ class DataStreamGroupWindowAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val needMerge = window match { case SessionGroupWindow(_, _, _) => true http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index e823cd6..4061242 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -116,7 +116,7 @@ class DataStreamOverAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val timeType = schema.logicalType .getFieldList