[FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.
This closes #3850. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd799c74 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd799c74 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd799c74 Branch: refs/heads/release-1.3 Commit: dd799c746cc464550222b9b126e3f60c5259df9f Parents: 34cff35 Author: Hequn Cheng <chenghe...@gmail.com> Authored: Mon May 8 20:55:51 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 9 18:53:14 2017 +0200 ---------------------------------------------------------------------- .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala | 2 +- .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 2 +- .../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 18f1fc8..506c0cb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -115,7 +115,7 @@ class DataStreamGroupAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val aggString = aggregationToString( inputSchema.logicalType, http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c38e5af..ef207b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -142,7 +142,7 @@ class DataStreamGroupWindowAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val needMerge = window match { case SessionGroupWindow(_, _, _) => true http://git-wip-us.apache.org/repos/asf/flink/blob/dd799c74/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index e823cd6..4061242 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -116,7 +116,7 @@ class DataStreamOverAggregate( val generator = new CodeGenerator( tableEnv.getConfig, false, - inputDS.getType) + inputSchema.physicalTypeInfo) val timeType = schema.logicalType .getFieldList