[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073795#comment-16073795
 ] 

Fabian Hueske commented on FLINK-7101:
--------------------------------------

That's a good find [~sunjincheng121] and a tricky question.

I think the best strategy is to completely ignore retraction records if 
{{inputCnt == 0}} or {{inputCnt == null}}. 
So I would actually exit the method before we modify the accumulator.

In the long run, this problem might not occur anymore. 
If all operators implement the state retention interval correctly, all keys 
without updates within some time should have been cleaned before they can sent 
a retraction to an operator with cleaned state. A previous operator would not 
be able to sent retraction after the cleanup interval because it would have to 
be cleaned before (not 100% sure about this, need to think about this a bit 
more). 
This would also mean that each operator has to sent out updates even if the 
result does not change (the {{prevRow.row.equals(newRow.row)}} check should be 
removed because it might cause state to be cleaned up too early).

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7101
>                 URL: https://issues.apache.org/jira/browse/FLINK-7101
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>             Fix For: 1.4.0
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>      ...
>     } else {
>      ...
>     }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>      ...
>     } else {
>     if( null != prevRow.row){
>      ...
>      }
>     }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to