Repository: flink Updated Branches: refs/heads/master 36830adac -> 61914abff
[FLINK-6650] [table] Improve the error message for toAppendStream This closes #3958. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61914abf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61914abf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61914abf Branch: refs/heads/master Commit: 61914abffa83a55d4f0a339dbcf64c540962a9cd Parents: 36830ad Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Mon May 22 17:04:13 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Wed May 24 16:02:40 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/table/api/StreamTableEnvironment.scala | 2 +- .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 4 ++-- .../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 4 ++-- .../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 3 +++ 4 files changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index c430b21..bc5038d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -630,7 +630,7 @@ abstract class StreamTableEnvironment( if (!withChangeFlag && !isAppendOnly(logicalPlan)) { throw new TableException( "Table is not an append-only table. " + - "Output needs to handle update and delete changes.") + "Use the toRetractStream() in order to handle add and retract messages.") } // get CRow plan http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 1ac013a..d860cbe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -126,9 +126,9 @@ class DataStreamGroupWindowAggregate( val physicalNamedProperties = namedProperties .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType)) - val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input) - if (consumeRetraction) { + if (inputIsAccRetract) { throw new TableException( "Retraction on windowed GroupBy aggregation is not supported yet. " + "Note: Windowed GroupBy aggregation should not follow a " + http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index a9fbf02..08f0356 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -116,9 +116,9 @@ class DataStreamOverAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input) - if (consumeRetraction) { + if (inputIsAccRetract) { throw new TableException( "Retraction on Over window aggregation is not supported yet. " + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 8073959..2907b99 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -152,6 +152,9 @@ object AggregateUtil { * @param inputRowType Input row type * @param inputFieldTypes Types of the physical input fields * @param groupings the position (in the input Row) of the grouping keys + * @param queryConfig The configuration of the query to generate. + * @param generateRetraction It is a tag that indicates whether generate retract record. + * @param consumeRetraction It is a tag that indicates whether consume the retract record. * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createGroupAggregateFunction(