[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089622#comment-16089622
 ] 

ASF GitHub Bot commented on FLINK-7101:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4348#discussion_r127677326
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
    @@ -131,7 +131,8 @@ class GroupAggProcessFunction(
     
           // if this was not the first row and we have to emit retractions
           if (generateRetraction && !firstRow) {
    -        if (prevRow.row.equals(newRow.row)) {
    +        // the condition of !stateCleaningEnabled is avoided state to be 
cleaned up too early
    --- End diff --
    
    I would move the description into the `if` block and modify it as follows:
    ```
    // newRow is the same as before and state cleaning is not enabled. We do 
not emit retraction and acc message.
    // If state cleaning is enabled, we have to emit messages to prevent too 
early state eviction of downstream operators.
    ```


> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7101
>                 URL: https://issues.apache.org/jira/browse/FLINK-7101
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>             Fix For: 1.4.0
>
>         Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>      ...
>     } else {
>      ...
>     }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>      ...
>     } else {
>     if( null != prevRow.row){
>      ...
>      }
>     }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to