[ 
https://issues.apache.org/jira/browse/FLINK-21413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17287862#comment-17287862
 ] 

Yu Li commented on FLINK-21413:
-------------------------------

Checking the [state TTL FLIP 
document|https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively#FLIP25:SupportUserStateTTLNatively-TTLbehaviour]
 I cannot find description on whether TTL for a whole map is supported for 
{{MapState}}, but according to the current implementation the answer is no (TTL 
is only checked against value of each map entry). What's more, in 
[HeapMapState#remove|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java#L130-L132]
 we could see the whole map will be removed if become empty, so I don't think 
{{TtlIncrementalCleanup}} need to take care of the empty map case.

Accordingly, I think we should have a fast path in 
{{TtlMapState#getUnexpiredOrNull}} to check whether {{ttlValue}} is empty and 
return it directly (instead of returning {{NULL}}) if so, and returning 
{{NULL}} iif {{ttlValue}} is not empty but all values expired 
({{unexpired.size()}} is zero).

And similar logic should be applied to {{TtlListState#getUnexpiredOrNull}}.

Please let me know your thoughts [~wind_ljy]. Thanks.

> TtlMapState and TtlListState cannot be clean completely with Filesystem 
> StateBackend
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-21413
>                 URL: https://issues.apache.org/jira/browse/FLINK-21413
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.9.0
>            Reporter: Jiayi Liao
>            Priority: Major
>         Attachments: image-2021-02-19-11-13-58-672.png
>
>
> Take the #TtlMapState as an example,
>  
> {code:java}
> public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, 
> TtlValue<UV>> ttlValue) {
>         Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
>         TypeSerializer<TtlValue<UV>> valueSerializer =
>                 ((MapSerializer<UK, TtlValue<UV>>) 
> original.getValueSerializer()).getValueSerializer();
>         for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
>                 if (!expired(e.getValue())) {
>                         // we have to do the defensive copy to update the 
> value
>                         unexpired.put(e.getKey(), 
> valueSerializer.copy(e.getValue()));
>                 }
>         }
>         return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
> }
> {code}
>  
> The returned value will never be null and the #StateEntry will exists 
> forever, which leads to memory leak if the key's range of the stream is very 
> large. Below we can see that 20+ millison uncleared TtlStateMap could take up 
> several GB memory.
>  
> !image-2021-02-19-11-13-58-672.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to