Github user senorcarbone commented on the issue:

    https://github.com/apache/flink/pull/1668
  
    The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.
    
    More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:
    
    - A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
    - Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
     - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.
    
    Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to