[ 
https://issues.apache.org/jira/browse/FLINK-21413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liao updated FLINK-21413:
-------------------------------
    Description: 
Take the #TtlMapState as an example,

 
{code:java}
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> 
ttlValue) {
        Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
        TypeSerializer<TtlValue<UV>> valueSerializer =
                ((MapSerializer<UK, TtlValue<UV>>) 
original.getValueSerializer()).getValueSerializer();
        for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
                if (!expired(e.getValue())) {
                        // we have to do the defensive copy to update the value
                        unexpired.put(e.getKey(), 
valueSerializer.copy(e.getValue()));
                }
        }
        return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
{code}
 

The returned value will never be null and the #StateEntry will exists forever, 
which leads to memory leak if the key's range of the stream is very large. 
Below we can see that 20+ millison uncleared TtlStateMap could take up several 
GB memory.

 

!image-2021-02-19-11-13-58-672.png!

  was:
Take the #TtlMapState as an example,

 
{code:java}
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> 
ttlValue) {
        Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
        TypeSerializer<TtlValue<UV>> valueSerializer =
                ((MapSerializer<UK, TtlValue<UV>>) 
original.getValueSerializer()).getValueSerializer();
        for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
                if (!expired(e.getValue())) {
                        // we have to do the defensive copy to update the value
                        unexpired.put(e.getKey(), 
valueSerializer.copy(e.getValue()));
                }
        }
        return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
}
{code}
 

The returned value will never be null and the #StateEntry will exists forever, 
which leads to memory leak if the key's range of the stream is very large. 
Below we can see that 20+ millison uncleared TtlStateMap could take up several 
GB memory.

 

!image-2021-02-19-11-13-38-691.png!


> TtlMapState and TtlListState cannot be clean completely with Filesystem 
> StateBackend
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-21413
>                 URL: https://issues.apache.org/jira/browse/FLINK-21413
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.9.0
>            Reporter: Jiayi Liao
>            Priority: Major
>         Attachments: image-2021-02-19-11-13-58-672.png
>
>
> Take the #TtlMapState as an example,
>  
> {code:java}
> public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, 
> TtlValue<UV>> ttlValue) {
>         Map<UK, TtlValue<UV>> unexpired = new HashMap<>();
>         TypeSerializer<TtlValue<UV>> valueSerializer =
>                 ((MapSerializer<UK, TtlValue<UV>>) 
> original.getValueSerializer()).getValueSerializer();
>         for (Map.Entry<UK, TtlValue<UV>> e : ttlValue.entrySet()) {
>                 if (!expired(e.getValue())) {
>                         // we have to do the defensive copy to update the 
> value
>                         unexpired.put(e.getKey(), 
> valueSerializer.copy(e.getValue()));
>                 }
>         }
>         return ttlValue.size() == unexpired.size() ? ttlValue : unexpired;
> }
> {code}
>  
> The returned value will never be null and the #StateEntry will exists 
> forever, which leads to memory leak if the key's range of the stream is very 
> large. Below we can see that 20+ millison uncleared TtlStateMap could take up 
> several GB memory.
>  
> !image-2021-02-19-11-13-58-672.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to