[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089913#comment-16089913
 ] 

ASF GitHub Bot commented on FLINK-7101:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4348


> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089877#comment-16089877
 ] 

ASF GitHub Bot commented on FLINK-7101:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4348
  
Hi @fhueske Thanks  for the review. I'll address the description. and merge 
this PR.


> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089875#comment-16089875
 ] 

ASF GitHub Bot commented on FLINK-7101:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4348#discussion_r127715580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -131,7 +131,8 @@ class GroupAggProcessFunction(
 
   // if this was not the first row and we have to emit retractions
   if (generateRetraction && !firstRow) {
-if (prevRow.row.equals(newRow.row)) {
+// the condition of !stateCleaningEnabled is avoided state to be 
cleaned up too early
--- End diff --

Make sense to me.


> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089625#comment-16089625
 ] 

Fabian Hueske commented on FLINK-7101:
--

Hi [~sunjincheng121], is [PR #4348|https://github.com/apache/flink/pull/4348] 
addressing the original problem of this JIRA, i.e., retraction messages after 
state cleanup or do we need additional changes to prevent this?

Thanks, Fabian

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089622#comment-16089622
 ] 

ASF GitHub Bot commented on FLINK-7101:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4348#discussion_r127677326
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -131,7 +131,8 @@ class GroupAggProcessFunction(
 
   // if this was not the first row and we have to emit retractions
   if (generateRetraction && !firstRow) {
-if (prevRow.row.equals(newRow.row)) {
+// the condition of !stateCleaningEnabled is avoided state to be 
cleaned up too early
--- End diff --

I would move the description into the `if` block and modify it as follows:
```
// newRow is the same as before and state cleaning is not enabled. We do 
not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages to prevent too 
early state eviction of downstream operators.
```


> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088576#comment-16088576
 ] 

ASF GitHub Bot commented on FLINK-7101:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/4348

[FLINK-7101][table] add condition of !stateCleaningEnabled is avoided…

- [x] General
  - The pull request references the related JIRA issue ("
[FLINK-7101][table] add condition of !stateCleaningEnabled is avoided 
non-grouped window state to be cleaned up too early")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-7101-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4348.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4348


commit 1c006299c8b8a169e346ef776cf9c5f5d7af20e0
Author: sunjincheng121 
Date:   2017-07-15T11:43:30Z

[FLINK-7101][table] add condition of !stateCleaningEnabled is avoided 
non-grouped window state to be cleaned up too early




> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-05 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074330#comment-16074330
 ] 

sunjincheng commented on FLINK-7101:


Hi [~fhueske] I think:
1. We need the retraction records if {{inputCnt == 0}};
2. For current tableAPI/SQL, we should ignore retraction records if {{inputCnt 
< 0}} (when cleanup state);
3. You are right we should change the condition {{ if 
(prevRow.row.equals(newRow.row)) }} to {{ if (prevRow.row.equals(newRow.row) && 
!stateCleaningEnabled)}}

BTW, if we can set the parallel of operator(in the future), we also need change 
the current build-in {{SumWithRetractAggFunction}} getValue logic:
{code}
override def getValue(acc: SumWithRetractAccumulator[T]): T = {
if (acc.f1 > 0) { *> acc.f1 != 0*
  acc.f0
} else {
  null.asInstanceOf[T]
}
  }
{code}
The reason is:
!screenshot-1.png!

What do you think?

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
> Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

2017-07-04 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16073795#comment-16073795
 ] 

Fabian Hueske commented on FLINK-7101:
--

That's a good find [~sunjincheng121] and a tricky question.

I think the best strategy is to completely ignore retraction records if 
{{inputCnt == 0}} or {{inputCnt == null}}. 
So I would actually exit the method before we modify the accumulator.

In the long run, this problem might not occur anymore. 
If all operators implement the state retention interval correctly, all keys 
without updates within some time should have been cleaned before they can sent 
a retraction to an operator with cleaned state. A previous operator would not 
be able to sent retraction after the cleanup interval because it would have to 
be cleaned before (not 100% sure about this, need to think about this a bit 
more). 
This would also mean that each operator has to sent out updates even if the 
result does not change (the {{prevRow.row.equals(newRow.row)}} check should be 
removed because it might cause state to be cleaned up too early).

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` 
> config and retract agg
> 
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.3.1
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.4.0
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config 
> and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, 
> getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>  ...
> } else {
>  ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>  ...
> } else {
> if( null != prevRow.row){
>  ...
>  }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make 
> sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)