[ https://issues.apache.org/jira/browse/FLINK-19592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-19592: ----------------------------------- Labels: auto-deprioritized-minor auto-unassigned pull-request-available (was: auto-unassigned pull-request-available stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > MiniBatchGroupAggFunction should emit messages to prevent too early state > eviction of downstream operators > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-19592 > URL: https://issues.apache.org/jira/browse/FLINK-19592 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Smile > Priority: Not a Priority > Labels: auto-deprioritized-minor, auto-unassigned, > pull-request-available > > Currently, > [GroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L183] > will emit a retract and a new insert message when a new message with the > same key arrives. According to > [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566], it's a feature > to prevent too early state eviction of downstream operators. > However, > [MiniBatchGroupAggFunction|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L206] > doesn't. Before > [Flink-8566|https://issues.apache.org/jira/browse/FLINK-8566] being resolved, > it should also emit these messages. > *GroupAggFunction.java:* > {code:java} > if (!stateCleaningEnabled && equaliser.equals(prevAggValue, newAggValue)) { > // newRow is the same as before and state cleaning is not enabled. > // We do not emit retraction and acc message. > // If state cleaning is enabled, we have to emit messages to prevent > too early > // state eviction of downstream operators. > return; > } else { > // retract previous result > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > } > {code} > *MiniBatchGroupAggFunction.java:* > > {code:java} > if (!equaliser.equals(prevAggValue, newAggValue)) { > // new row is not same with prev row > if (generateUpdateBefore) { > // prepare UPDATE_BEFORE message for previous row > resultRow.replace(currentKey, > prevAggValue).setRowKind(RowKind.UPDATE_BEFORE); > out.collect(resultRow); > } > // prepare UPDATE_AFTER message for new row > resultRow.replace(currentKey, > newAggValue).setRowKind(RowKind.UPDATE_AFTER); > out.collect(resultRow); > } > // new row is same with prev row, no need to output > {code} > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)