[ 
https://issues.apache.org/jira/browse/FLINK-23170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-23170:
--------------------------------------
    Description: 
Currently, changelog state backend writes state metadata on first state access. 
It is written to the changelog
 On materialization, the changelog can be truncated, so the metadata needs to 
be written again.

 

Below is a proposed solution using the existing metadtaWritten flag.

An alternative would be to write metadata at the end of the materialized stream.
 Yet another approach is to write metadata to a separate file (however, it 
seems less optimal than at the end of the materialized stream and not so easy 
as writing again).

There are several questions to answer:
 - *When to mark* the metadata as not written (i.e. reset the metadataWritten 
flag)?
 ** After starting the materialization - so that any subsequent data is 
preceded by metadata
 - *When to request* the write (i.e. call append)
     At any point (mat. start / mat. end / checkpoint start). It doesn't matter 
for correctness - see the next points.
 Scheduling append earlier means:
 -- including metadata in changelog twice unnecesserily (won't hurt correctness)
 -- writing for nothing if materialization fails

Scheduling append later means slowing down the checkpoint
 So at materialization end seem to be a better tradeoff.
 - *What* metadata to write?
      Only for data which were changed after materialization started (so the 
flag is enough)
 - *Where* in changelog to write it to?
     No choice but to the end of the changelog. Because of updating SQN, the 
metadata will appear at the beginning of the state object returned by 
persist(sqn) called after materialization completes.
 - *How to wait for write completion* (before completing checkpoint)?
 Once appended, the future returned from persist() call should include it 
already
  

So to achieve this it's enough to call appendMetadata() for each changed state 
upon materialization start, or finish, or 1st checkpoint after it.

—
 Another related change is to skip writing metadata on recovery (only if state 
was read from the changelog). 
 This can be achieved by setting the flag when requesting the state from 
ChangeLogApplier.
 *Please create a separate ticket for that if not implementing in this one.*

—
 Note: with TM-side state ownership, actual log truncation may be delayed after 
materialization (until all the checkpoints using the log are subsumed). This 
should not affect the above logic.
  
 

  was:
Currently, changelog state backend writes state metadata on first state access. 
It is written to the changelog
 On materialization, the changelog can be truncated, so the metadata needs to 
be written again.

 

Below is a proposed solution using the existing metadtaWritten flag.

An alternative would be to write metadata at the end of the materialized stream.
Yet another approach is to write metadata to a separate file (however, it seems 
less optimal than at the end of the materialized stream and not so easy as 
writing again).

There are several questions to answer:
 - *When to mark* the metadata as not written (i.e. reset the metadataWritten 
flag)?
 ** After starting the materialization - so that any subsequent data is 
preceded by metadata
 - *When to request* the write (i.e. call append)
     At any point (mat. start / mat. end / checkpoint start). It doesn't matter 
for correctness - see the next points.
 Scheduling append earlier means:
 -- including metadata in changelog twice unnecesserily (won't hurt correctness)
 -- writing for nothing if materialization fails

Scheduling append later means slowing down the checkpoint
 So at materialization end seem to be a better tradeoff.
 - *What* metadata to write?
      Only for data which were changed after materialization started (so the 
flag is enough)
 - *Where* in changelog to write it to?
     No choice but to the end of the changelog. Because of updating SQN, the 
metadata will appear at the beginning of the state object returned by 
persist(sqn) called after materialization completes.
 - *How to wait for write completion* (before completing checkpoint)?
 Once appended, the future returned from persist() call should include it 
already
  

So to achieve this it's enough to call appendMetadata() for each changed state 
upon materialization start, or finish, or 1st checkpoint after it.

It can be further optimized by storing the SQN at which the metadata was 
written and only resetting the flag if materializedSqn >= metadataSqn; but 
materialization is relatively rare so it probably doesn't worth it.

—
 Another related change is to skip writing metadata on recovery (only if state 
was read from the changelog). 
 This can be achieved by setting the flag when requesting the state from 
ChangeLogApplier.
 *Please create a separate ticket for that if not implementing in this one.*

—
 Note: with TM-side state ownership, actual log truncation may be delayed after 
materialization (until all the checkpoints using the log are subsumed). This 
should not affect the above logic.
  


> Write metadata after materialization
> ------------------------------------
>
>                 Key: FLINK-23170
>                 URL: https://issues.apache.org/jira/browse/FLINK-23170
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends
>            Reporter: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Currently, changelog state backend writes state metadata on first state 
> access. It is written to the changelog
>  On materialization, the changelog can be truncated, so the metadata needs to 
> be written again.
>  
> Below is a proposed solution using the existing metadtaWritten flag.
> An alternative would be to write metadata at the end of the materialized 
> stream.
>  Yet another approach is to write metadata to a separate file (however, it 
> seems less optimal than at the end of the materialized stream and not so easy 
> as writing again).
> There are several questions to answer:
>  - *When to mark* the metadata as not written (i.e. reset the metadataWritten 
> flag)?
>  ** After starting the materialization - so that any subsequent data is 
> preceded by metadata
>  - *When to request* the write (i.e. call append)
>      At any point (mat. start / mat. end / checkpoint start). It doesn't 
> matter for correctness - see the next points.
>  Scheduling append earlier means:
>  -- including metadata in changelog twice unnecesserily (won't hurt 
> correctness)
>  -- writing for nothing if materialization fails
> Scheduling append later means slowing down the checkpoint
>  So at materialization end seem to be a better tradeoff.
>  - *What* metadata to write?
>       Only for data which were changed after materialization started (so the 
> flag is enough)
>  - *Where* in changelog to write it to?
>      No choice but to the end of the changelog. Because of updating SQN, the 
> metadata will appear at the beginning of the state object returned by 
> persist(sqn) called after materialization completes.
>  - *How to wait for write completion* (before completing checkpoint)?
>  Once appended, the future returned from persist() call should include it 
> already
>   
> So to achieve this it's enough to call appendMetadata() for each changed 
> state upon materialization start, or finish, or 1st checkpoint after it.
> —
>  Another related change is to skip writing metadata on recovery (only if 
> state was read from the changelog). 
>  This can be achieved by setting the flag when requesting the state from 
> ChangeLogApplier.
>  *Please create a separate ticket for that if not implementing in this one.*
> —
>  Note: with TM-side state ownership, actual log truncation may be delayed 
> after materialization (until all the checkpoints using the log are subsumed). 
> This should not affect the above logic.
>   
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to