Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3574
  
    Hi @huawei-flink, let me explain the idea of using `MapState` and its 
benefits in more detail.
    
    I'll start with the way that a `ListState` works. With `ListState` we can 
get efficient access to the head element of the list. However, when updating 
the `ListState`, we cannot remove individual elements but have to clear the 
complete state and reinsert all elements that should remain. Hence we always 
need to deserialize and serialize all elements of a `ListState`.
    
    With the `MapState` approach, we would put the elements in a map which is 
keyed on their processing timestamp. Since multiple records can arrive within 
the same millisecond, we use a `List[Row]` as value type for the map. To 
process a new row, we have to find the "oldest" row (i.e., the one with the 
smallest timestamp) to retract it from the accumulator. With `ListState` this 
is trivial, it is the head element. With `MapState` we have to iterate over the 
keys and find the smallest one (smallest processing timestamp). This requires 
to deserialize all keys, but these are only `Long` values and not complete 
rows. With the smallest key, we can get the `List[Row]` value and take the 
first Row from the list and retract it from the accumulator. When updating the 
state, we only update the `List[Row]` value of the smallest key (or possible 
remove it if the `List[Row]` became empty).
    
    So the benefit of using `MapState` of `ListState` is that we only read `n` 
Long (+ read/write 1 `List[Row]`) instead of reading and writing `n` Row values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to