[ https://issues.apache.org/jira/browse/FLINK-19592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356152#comment-17356152 ]
Smile commented on FLINK-19592: ------------------------------- [~libenchao] Could you please help review this PR? > MiniBatchGroupAggFunction should emit messages to prevent too early state > eviction of downstream operators > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-19592 > URL: https://issues.apache.org/jira/browse/FLINK-19592 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Smile > Priority: Minor > Labels: auto-unassigned, pull-request-available > > Currently, > [GroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183] > will emit a retract and a new insert message when a new message with the > same key arrives. According to > [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566], it's a feature > to prevent too early state eviction of downstream operators. > However, > [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206] > doesn't. Before > [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566] being resolved, > it should also emit these messages. > *GroupAggFunction.java:* > {code:java} > if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { > // newRow is the same as before and state cleaning is not enabled. > // We do not emit retraction and acc message. > // If state cleaning is enabled, we have to emit messages to prevent > too early > // state eviction of downstream operators. > return; > } else { > // retract previous result > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > } > {code} > *MiniBatchGroupAggFunction.java:* > > {code:java} > if (!equaliser.equals(prevAggValue, newAggValue)) { > // new row is not same with prev row > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > out.collect(resultRow); > } > // new row is same with prev row, no need to output > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)