[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2023-02-09 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686313#comment-17686313
 ] 

Weijie Guo commented on FLINK-30208:


[~xljtswf] What's the progress of this ticket? Do you have the performance 
testing report?

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-22 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651155#comment-17651155
 ] 

Yingjie Cao commented on FLINK-30208:
-

[~xljtswf] Thanks for your explanation. I think there is no compatibility 
issue. But I am not sure about the performance gain. Do you have any test 
numbers?

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-08 Thread xljtswf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644784#comment-17644784
 ] 

xljtswf commented on FLINK-30208:
-

[~kevin.cyj] Yes, this is the most cases.
Indeed, there are 2 conditions we can improve:
1. when maxCount==1, i.e. we will trigger on every element. Thus we even do not 
need to use the state at all, there will be no get and update. And stateDesc do 
not need to be instantiated which can save the serialized memory. But this will 
change the serialized bytes of CountTrigger, I do not know whether it will 
change the back-compatibilaty.
2. when maxCount > 1. For the last element, we can save 1 add operation.

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-08 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644663#comment-17644663
 ] 

Yingjie Cao commented on FLINK-30208:
-

After the change, for most cases, the state will be still accessed twice (get 
and update), am I understanding right?

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)