[ 
https://issues.apache.org/jira/browse/FLINK-23251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-23251:
--------------------------------------
    Description: 
FLINK-23139 adds private state management capabilities to TM.

However, it does not consider multiple retained checkpoints. 

In most cases, it should work correctly:
 # TMs will not discard the state of the previous checkpoints if it's not used 
in the latest one - becase they are not aware of it
 # If some state *is* reused (incremental checkpoints); it will be discarded by 
TMs on latest checkpoint subsumption - which means that the previous 
checkpoints are subsumed too

However, JM will also try to discard the state on subsumption (it's not shared 
between TMs).

So, the state *will* be removed, it will *not* be removed prematurely, but 
there can be race conditions.

--

The simplest way to solve this is to ignore (log) discard errors.

Other options include:
 # treat all state after recovery as "distributed", so TMs won't discard it
 # compute the intersection between the checkpoints and pass it to TMs as 
distributed state, so they won't discard it
 # compute the intersection between the checkpoints and prevent JM from 
discarding it
 # pass all recovered snapshots from JM to TM on recovery

  was:
FLINK-23139 should add private state management capabilities to TM. It does not 
consider multiple retained checkpoints.

>From the offline discussions, there seem to be three options:
 # pass all num-retained snapshots from JM to TM on recovery (and increase 
counts accordingly)
 # do ref count on JM on recovery and send these counts to TM
 # store counts externally (so JM is not involved)

Option 1 seems preferrable.


> Support more than one retained checkpoints
> ------------------------------------------
>
>                 Key: FLINK-23251
>                 URL: https://issues.apache.org/jira/browse/FLINK-23251
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends
>            Reporter: Roman Khachatryan
>            Priority: Minor
>             Fix For: 1.14.0
>
>
> FLINK-23139 adds private state management capabilities to TM.
> However, it does not consider multiple retained checkpoints. 
> In most cases, it should work correctly:
>  # TMs will not discard the state of the previous checkpoints if it's not 
> used in the latest one - becase they are not aware of it
>  # If some state *is* reused (incremental checkpoints); it will be discarded 
> by TMs on latest checkpoint subsumption - which means that the previous 
> checkpoints are subsumed too
> However, JM will also try to discard the state on subsumption (it's not 
> shared between TMs).
> So, the state *will* be removed, it will *not* be removed prematurely, but 
> there can be race conditions.
> --
> The simplest way to solve this is to ignore (log) discard errors.
> Other options include:
>  # treat all state after recovery as "distributed", so TMs won't discard it
>  # compute the intersection between the checkpoints and pass it to TMs as 
> distributed state, so they won't discard it
>  # compute the intersection between the checkpoints and prevent JM from 
> discarding it
>  # pass all recovered snapshots from JM to TM on recovery



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to