[ https://issues.apache.org/jira/browse/FLINK-27539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jane Chan resolved FLINK-27539. ------------------------------- Resolution: Duplicate > support consuming update and delete changes In Windowing TVFs > ------------------------------------------------------------- > > Key: FLINK-27539 > URL: https://issues.apache.org/jira/browse/FLINK-27539 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API > Affects Versions: 1.15.0 > Reporter: hjw > Priority: Major > > custom_kafka is a cdc table > sql: > {code:java} > select DATE_FORMAT(window_end,'yyyy-MM-dd') as date_str,sum(money) as > total,name > from TABLE(CUMULATE(TABLE custom_kafka,descriptor(createtime),interval '1' > MINUTES,interval '1' DAY )) > where status='1' > group by name,window_start,window_end; > {code} > Error > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: > StreamPhysicalWindowAggregate doesn't support consuming update and delete > changes which is produced by node TableSourceScan(table=[[default_catalog, > default_database, custom_kafka, watermark=[-(createtime, 5000:INTERVAL > SECOND)]]], fields=[name, money, status, createtime, operation_ts]) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:396) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:315) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:353) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:342) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:341) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.immutable.Range.foreach(Range.scala:155) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:341) > {code} > But I found Group Window Aggregation is works when use cdc table > {code:java} > select DATE_FORMAT(TUMBLE_END(createtime,interval '10' MINUTES),'yyyy-MM-dd') > as date_str,sum(money) as total,name > from custom_kafka > where status='1' > group by name,TUMBLE(createtime,interval '10' MINUTES) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)