[jira] [Commented] (FLINK-30217) Use ListState#update() to replace clear + add mode.

2022-12-08 Thread xljtswf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644794#comment-17644794
 ] 

xljtswf commented on FLINK-30217:
-

For the KeyedState, I found this will almost always happen in 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet#persist.
 Every time, when it comes one element with timestamp later than every element 
in the Session Window, the mapping will change.

> Use ListState#update() to replace clear + add mode.
> ---
>
> Key: FLINK-30217
> URL: https://issues.apache.org/jira/browse/FLINK-30217
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: xljtswf
>Priority: Major
>
> When using listState, I found many times we need to clear current state, then 
> add new values. This is especially common in 
> CheckpointedFunction#snapshotState, which is slower than just use 
> ListState#update().
> Suppose we want to update the liststate to contain value1, value2, value3.
> With current implementation, we first call Liststate#clear(). this updates 
> the state 1 time.
> then we add value1, value2, value3 to the state.
> if we use heap state, we need to search the stateTable 3 times and add 3 
> values to the list.
> this happens in memory and is not too bad.
> if we use rocksdb. then we will call backend.db.merge() 3 times.
> finally, we will  update the state 4 times.
> The more values to be added, the more times we will update the state.
> while if we use listState#update. then we just need to update the state 1 
> time. I think this can save a lot of time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-12-08 Thread xljtswf (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644784#comment-17644784
 ] 

xljtswf commented on FLINK-30208:
-

[~kevin.cyj] Yes, this is the most cases.
Indeed, there are 2 conditions we can improve:
1. when maxCount==1, i.e. we will trigger on every element. Thus we even do not 
need to use the state at all, there will be no get and update. And stateDesc do 
not need to be instantiated which can save the serialized memory. But this will 
change the serialized bytes of CountTrigger, I do not know whether it will 
change the back-compatibilaty.
2. when maxCount > 1. For the last element, we can save 1 add operation.

> avoid unconditional state update in CountTrigger#onElement
> --
>
> Key: FLINK-30208
> URL: https://issues.apache.org/jira/browse/FLINK-30208
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> In current CountTrigger#onElement, when one element is received, the state is 
> updated unconditionally, and we then fetch the state again to check whether 
> we need to clear the state. This implies we may update the state 2 times to 
> process one element. I suppose to make following simplification:
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx)
> throws Exception {
> TriggerResult triggerResult;
> if (maxCount > 1) {
> ReducingState countState = 
> ctx.getPartitionedState(stateDesc);
> Long currentCount = countState.get();
> if (currentCount == null || currentCount < maxCount - 1) {
> countState.add(1L);
> triggerResult = TriggerResult.CONTINUE;
> } else {
> countState.clear();
> triggerResult = TriggerResult.FIRE;
> }
> } else {
> triggerResult = TriggerResult.FIRE;
> }
> return triggerResult;
> }
> If this is approved, I will make a pr then.
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30217) Use ListState#update() to replace clear + add mode.

2022-11-25 Thread xljtswf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xljtswf updated FLINK-30217:

Component/s: API / DataStream

> Use ListState#update() to replace clear + add mode.
> ---
>
> Key: FLINK-30217
> URL: https://issues.apache.org/jira/browse/FLINK-30217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: xljtswf
>Priority: Major
>
> When using listState, I found many times we need to clear current state, then 
> add new values. This is especially common in 
> CheckpointedFunction#snapshotState, which is slower than just use 
> ListState#update().
> Suppose we want to update the liststate to contain value1, value2, value3.
> With current implementation, we first call Liststate#clear(). this updates 
> the state 1 time.
> then we add value1, value2, value3 to the state.
> if we use heap state, we need to search the stateTable 3 times and add 3 
> values to the list.
> this happens in memory and is not too bad.
> if we use rocksdb. then we will call backend.db.merge() 3 times.
> finally, we will  update the state 4 times.
> The more values to be added, the more times we will update the state.
> while if we use listState#update. then we just need to update the state 1 
> time. I think this can save a lot of time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30217) Use ListState#update() to replace clear + add mode.

2022-11-25 Thread xljtswf (Jira)
xljtswf created FLINK-30217:
---

 Summary: Use ListState#update() to replace clear + add mode.
 Key: FLINK-30217
 URL: https://issues.apache.org/jira/browse/FLINK-30217
 Project: Flink
  Issue Type: Improvement
Reporter: xljtswf


When using listState, I found many times we need to clear current state, then 
add new values. This is especially common in 
CheckpointedFunction#snapshotState, which is slower than just use 
ListState#update().

Suppose we want to update the liststate to contain value1, value2, value3.
With current implementation, we first call Liststate#clear(). this updates the 
state 1 time.
then we add value1, value2, value3 to the state.
if we use heap state, we need to search the stateTable 3 times and add 3 values 
to the list.
this happens in memory and is not too bad.
if we use rocksdb. then we will call backend.db.merge() 3 times.
finally, we will  update the state 4 times.
The more values to be added, the more times we will update the state.

while if we use listState#update. then we just need to update the state 1 time. 
I think this can save a lot of time.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

2022-11-25 Thread xljtswf (Jira)
xljtswf created FLINK-30208:
---

 Summary: avoid unconditional state update in CountTrigger#onElement
 Key: FLINK-30208
 URL: https://issues.apache.org/jira/browse/FLINK-30208
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: xljtswf


In current CountTrigger#onElement, when one element is received, the state is 
updated unconditionally, and we then fetch the state again to check whether we 
need to clear the state. This implies we may update the state 2 times to 
process one element. I suppose to make following simplification:

public TriggerResult onElement(Object element, long timestamp, W window, 
TriggerContext ctx)
throws Exception {
TriggerResult triggerResult;
if (maxCount > 1) {
ReducingState countState = ctx.getPartitionedState(stateDesc);
Long currentCount = countState.get();
if (currentCount == null || currentCount < maxCount - 1) {
countState.add(1L);
triggerResult = TriggerResult.CONTINUE;
} else {
countState.clear();
triggerResult = TriggerResult.FIRE;
}
} else {
triggerResult = TriggerResult.FIRE;
}
return triggerResult;
}

If this is approved, I will make a pr then.
Thanks!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-30048) MapState.remove(UK key) is better to return the old value.

2022-11-21 Thread xljtswf (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xljtswf closed FLINK-30048.
---
Resolution: Won't Do

> MapState.remove(UK key) is better to return the old value.
> --
>
> Key: FLINK-30048
> URL: https://issues.apache.org/jira/browse/FLINK-30048
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / State Backends
>Reporter: xljtswf
>Priority: Major
>
> Hi all:
> I found the MapState.remove(UK key) just returns nothing, and the Java Map 
> Interface returns the old value associated with the key.
> Consider the flollowing example from the Learn Flink Event-driven Application
> [链接标题|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/event_driven/#the-ontimer-method]
>  ,
> we want to get the value with the timestamp and then delete the timestamp, 
> with current inplement, we must first call mapState.get(key),then call 
> mapState.remove(key). it will search the key 2 times, I think it is not 
> necessary. If mapState.remove(key) can return the old value, then we can just 
> call mapState.remove(key) and get the old value and save the unnecessary 2nd 
> search in the map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30048) MapState.remove(UK key) is better to return the old value.

2022-11-16 Thread xljtswf (Jira)
xljtswf created FLINK-30048:
---

 Summary: MapState.remove(UK key) is better to return the old value.
 Key: FLINK-30048
 URL: https://issues.apache.org/jira/browse/FLINK-30048
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: xljtswf


Hi all:

I found the MapState.remove(UK key) just returns nothing, and the Java Map 
Interface returns the old value associated with the key.

Consider the flollowing example from the Learn Flink Event-driven Application

[链接标题|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/event_driven/#the-ontimer-method]
 ,

we want to get the value with the timestamp and then delete the timestamp, with 
current inplement, we must first call mapState.get(key),then call 
mapState.remove(key). it will search the key 2 times, I think it is not 
necessary. If mapState.remove(key) can return the old value, then we can just 
call mapState.remove(key) and get the old value and save the unnecessary 2nd 
search in the map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)