[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089913#comment-16089913 ] ASF GitHub Bot commented on FLINK-7101: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4348 > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089877#comment-16089877 ] ASF GitHub Bot commented on FLINK-7101: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4348 Hi @fhueske Thanks for the review. I'll address the description. and merge this PR. > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089875#comment-16089875 ] ASF GitHub Bot commented on FLINK-7101: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4348#discussion_r127715580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -131,7 +131,8 @@ class GroupAggProcessFunction( // if this was not the first row and we have to emit retractions if (generateRetraction && !firstRow) { -if (prevRow.row.equals(newRow.row)) { +// the condition of !stateCleaningEnabled is avoided state to be cleaned up too early --- End diff -- Make sense to me. > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089625#comment-16089625 ] Fabian Hueske commented on FLINK-7101: -- Hi [~sunjincheng121], is [PR #4348|https://github.com/apache/flink/pull/4348] addressing the original problem of this JIRA, i.e., retraction messages after state cleanup or do we need additional changes to prevent this? Thanks, Fabian > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089622#comment-16089622 ] ASF GitHub Bot commented on FLINK-7101: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4348#discussion_r127677326 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -131,7 +131,8 @@ class GroupAggProcessFunction( // if this was not the first row and we have to emit retractions if (generateRetraction && !firstRow) { -if (prevRow.row.equals(newRow.row)) { +// the condition of !stateCleaningEnabled is avoided state to be cleaned up too early --- End diff -- I would move the description into the `if` block and modify it as follows: ``` // newRow is the same as before and state cleaning is not enabled. We do not emit retraction and acc message. // If state cleaning is enabled, we have to emit messages to prevent too early state eviction of downstream operators. ``` > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088576#comment-16088576 ] ASF GitHub Bot commented on FLINK-7101: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4348 [FLINK-7101][table] add condition of !stateCleaningEnabled is avoided… - [x] General - The pull request references the related JIRA issue (" [FLINK-7101][table] add condition of !stateCleaningEnabled is avoided non-grouped window state to be cleaned up too early") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-7101-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4348 commit 1c006299c8b8a169e346ef776cf9c5f5d7af20e0 Author: sunjincheng121Date: 2017-07-15T11:43:30Z [FLINK-7101][table] add condition of !stateCleaningEnabled is avoided non-grouped window state to be cleaned up too early > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074330#comment-16074330 ] sunjincheng commented on FLINK-7101: Hi [~fhueske] I think: 1. We need the retraction records if {{inputCnt == 0}}; 2. For current tableAPI/SQL, we should ignore retraction records if {{inputCnt < 0}} (when cleanup state); 3. You are right we should change the condition {{ if (prevRow.row.equals(newRow.row)) }} to {{ if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled)}} BTW, if we can set the parallel of operator(in the future), we also need change the current build-in {{SumWithRetractAggFunction}} getValue logic: {code} override def getValue(acc: SumWithRetractAccumulator[T]): T = { if (acc.f1 > 0) { *> acc.f1 != 0* acc.f0 } else { null.asInstanceOf[T] } } {code} The reason is: !screenshot-1.png! What do you think? > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073795#comment-16073795 ] Fabian Hueske commented on FLINK-7101: -- That's a good find [~sunjincheng121] and a tricky question. I think the best strategy is to completely ignore retraction records if {{inputCnt == 0}} or {{inputCnt == null}}. So I would actually exit the method before we modify the accumulator. In the long run, this problem might not occur anymore. If all operators implement the state retention interval correctly, all keys without updates within some time should have been cleaned before they can sent a retraction to an operator with cleaned state. A previous operator would not be able to sent retraction after the cleanup interval because it would have to be cleaned before (not 100% sure about this, need to think about this a bit more). This would also mean that each operator has to sent out updates even if the result does not change (the {{prevRow.row.equals(newRow.row)}} check should be removed because it might cause state to be cleaned up too early). > Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` > config and retract agg > > > Key: FLINK-7101 > URL: https://issues.apache.org/jira/browse/FLINK-7101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.3.1 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > > When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config > and retract AGG, Will emit "NULL" agg value which we do not expect. > For example: ({{IntSumWithRetractAggFunction}}) > 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) > 2. Cleanup state > 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, > getValue= null > So, we must change the logic of {{GroupAggProcessFunction}} as follows: > {code} > if (inputCnt != 0) { > ... > } else { > ... > } > {code} > TO > {code} > if (inputCnt > 0) { > ... > } else { > if( null != prevRow.row){ > ... > } > } > {code} > In this case, the result will bigger than expected, but i think it's make > sense, because user want cleanup state.(they should know the impact) > What do you think? [~fhueske] [~hequn8128] -- This message was sent by Atlassian JIRA (v6.4.14#64029)