[ https://issues.apache.org/jira/browse/FLINK-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058858#comment-17058858 ]
Jark Wu commented on FLINK-16047: --------------------------------- I will pick up this issue and submit a PR soon. > Blink planner produces wrong aggregate results with state clean up > ------------------------------------------------------------------ > > Key: FLINK-16047 > URL: https://issues.apache.org/jira/browse/FLINK-16047 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.0 > Reporter: Timo Walther > Priority: Critical > Fix For: 1.9.3, 1.10.1, 1.11.0 > > > It seems that FLINK-10674 has not been ported to the Blink planner. > Because state clean up happens in processing time, it might be the case that > retractions are arriving after the state has been cleaned up. Before these > changes, a new accumulator was created and invalid retraction messages were > emitted. This change drops retraction messages for which no accumulator > exists. > These lines are missing in > {{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}: > {code} > if (null == accumulators) { > // Don't create a new accumulator for a retraction message. This > // might happen if the retraction message is the first message for the > // key or after a state clean up. > if (!inputC.change) { > return > } > // first accumulate message > firstRow = true > accumulators = function.createAccumulators() > } else { > firstRow = false > } > {code} > The bug has not been verified. I spotted it only by looking at the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)