Thanks for clarifying.

Concerning the JM aborted checkpoints and state handles: I was thinking
about it the other day as well and was considering an approach like that:

The core idea is to move the cleanup from JM to TM. That solves two issues:

(1) The StateBackends / DSTL delete the artifacts themselves, meaning we
don't have to make assumptions about the state on the JM. That sounds too
fragile, with easy bugs as soon as some slight assumptions change (see also
bug with incr. checkpoint / savepoint data loss,
https://issues.apache.org/jira/browse/FLINK-21351)

(2) We do not need to clean up from one node. In the past, doing the
cleanup from one node (JM) has sometimes become a bottleneck.

To achieve that, we would need to extend the "notifyCheckpointComplete()"
RPC from the JM to the TM includes both the ID of the completed checkpoint,
and the ID of the earliest retained checkpoint. Then the TM can clean up
all artifacts from earlier checkpoints.

There are two open questions to that design:
(1) On restore, we need to communicate the state handles of the previous
checkpoints to the TM as well, so the TM gets again the full picture of all
state artifacts.
(2) On rescaling, we need to clarify which TM is responsible for releasing
a handle, if they are mapped to multiple TMs. Otherwise we get
double-delete calls. That isn't per se a problem, it is just a bit less
efficient.


Maybe we could think in that direction for the DSTL work?



On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan <ro...@apache.org> wrote:

> Thanks for your reply Stephan.
>
> Yes, there is overlap between FLIP-151 and FLIP-158 as both
> address incremental state updates. However, I think that FLIP-151 on top
> of FLIP-158 increases efficiency by:
>
> 1. "Squashing" the changes made to the same key. For example, if some
> counter was changed 10 times then FLIP-151 will send only the last value
> (this allows to send AND store less data compared to FLIP-158)
>
> 2. Keeping in memory only the changed keys and not the values.
> (this allows to reduce memory AND latency (caused by serialization +
> copying on every update) compared to FLIP-158)
>
> (1) can probably be implemented in FLIP-158, but not (2).
>
> I don't think there will be a lot of follow-up efforts and I hope
> @Dawid Wysakowicz <dwysakow...@apache.org>, @pnowojski
> <pnowoj...@apache.org> , Yuan Mei and probably
> @Yu Li <car...@gmail.com>  will be able to join at different stages.
>
> Regarding using only the confirmed checkpoints, you are right: JM can
> abort non-confirmed checkpoints and discard the state. FLIP-158 has
> the same problem because StateChangelog produces StateHandles that
>  can be discarded by the JM. Currently, potentially discarded
> changes are re-uploaded in both FLIPs.
>
> In FLIP-158 (or follow-up), I planned to improve this part by:
> 1. Limiting max-concurrent-checkpoints to 1, and
> 2. Sending the last confirmed checkpoint ID in RPCs and barriers
> So at the time of checkpoint, backend knows exactly which changes can be
> included.
>
> Handling of removed keys is not related to the aborted checkpoints. They
> are
> needed on recovery to actually remove data from the previous snapshot.
> In FLIP-158 it is again similar: ChangelogStateBackend has to encode
> removal operations and send them to StateChangelog (though no additional
> data structure is required).
>
> Regards,
> Roman
>
>
> On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen <se...@apache.org> wrote:
>
> > Thanks, Roman for publishing this design.
> >
> > There seems to be quite a bit of overlap with FLIP-158 (generalized
> > incremental checkpoints).
> >
> > I would go with +1 to the effort if it is a pretty self-contained and
> > closed effort. Meaning we don't expect that this needs a ton of
> follow-ups,
> > other than common maintenance and small bug fixes. If we expect that this
> > requires a lot of follow-ups, then we end up splitting our work between
> > this FLIP and FLIP-158, which seems a bit inefficient.
> > What other committers would be involved to ensure the community can
> > maintain this?
> >
> >
> > The design looks fine, in general, with one question:
> >
> > When persisting changes, you persist all changes that have a newer
> version
> > than the latest one confirmed by the JM.
> >
> > Can you explain why it is like that exactly? Alternatively, you could
> keep
> > the latest checkpoint ID for which the state backend persisted the diff
> > successfully to the checkpoint storage, and created a state handle. For
> > each checkpoint, the state backend includes the state handles of all
> > involved chunks. That would be similar to the log-based approach in
> > FLIP-158.
> >
> > I have a suspicion that this is because the JM may have released the
> state
> > handle (and discarded the diff) for a checkpoint that succeeded on the
> task
> > but didn't succeed globally. So we cannot reference any state handle that
> > has been handed over to the JobManager, but is not yet confirmed.
> >
> > This characteristic seems to be at the heart of much of the complexity,
> > also the handling of removed keys seems to be caused by that.
> > If we could change that assumption, the design would become simpler.
> >
> > (Side note: I am wondering if this also impacts the FLIP-158 DSTL
> design.)
> >
> > Best,
> > Stephan
> >
> >
> > On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
> > khachatryan.ro...@gmail.com> wrote:
> >
> > > Hi Stefan,
> > >
> > > Thanks for your reply. Very interesting ideas!
> > > If I understand correctly, SharedStateRegistry will still be
> responsible
> > > for pruning the old state; for that, it will maintain some (ordered)
> > > mapping between StateMaps and their versions, per key group.
> > > I think one modification to this approach is needed to support
> > journaling:
> > > for each entry, maintain a version when it was last fully snapshotted;
> > and
> > > use this version to find the minimum as you described above.
> > > I'm considering a better state cleanup and optimization of removals as
> > the
> > > next step. Anyway, I will add it to the FLIP document.
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter <
> > stefanrichte...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Very happy to see that the incremental checkpoint idea is finally
> > > becoming
> > > > a reality for the heap backend! Overall the proposal looks pretty
> good
> > to
> > > > me. Just wanted to point out one possible improvement from what I can
> > > still
> > > > remember from my ideas back then: I think you can avoid doing
> periodic
> > > full
> > > > snapshots for consolidation. Instead, my suggestion would be to track
> > the
> > > > version numbers you encounter while you iterate a snapshot for
> writing
> > > it -
> > > > and then you should be able to prune all incremental snapshots that
> > were
> > > > performed with a version number smaller than the minimum you find. To
> > > avoid
> > > > the problem of very old entries that never get modified you could
> start
> > > > spilling entries with a certain age-difference compared to the
> current
> > > map
> > > > version so that eventually all entries for an old version are
> > re-written
> > > to
> > > > newer snapshots. You can track the version up to which this was done
> in
> > > the
> > > > map and then you can again let go of their corresponding snapshots
> > after
> > > a
> > > > guaranteed time.So instead of having the burden of periodic large
> > > > snapshots, you can make every snapshot work a little bit on the
> cleanup
> > > and
> > > > if you are lucky it might happen mostly by itself if most entries are
> > > > frequently updated. I would also consider to make map clean a special
> > > event
> > > > in your log and consider unticking the versions on this event - this
> > > allows
> > > > you to let go of old snapshots and saves you from writing a log of
> > > > antimatter entries. Maybe the ideas are still useful to you.
> > > >
> > > > Best,
> > > > Stefan
> > > >
> > > > On 2020/11/04 01:54:25, Khachatryan Roman <k...@gmail.com> wrote:
> > > > > Hi devs,>
> > > > >
> > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots
> > for>
> > > > > heap-based state backend [1]>
> > > > >
> > > > > Heap backend, while being limited state sizes fitting into memory,
> > also
> > > > has>
> > > > > some advantages compared to RocksDB backend:>
> > > > > 1. Serialization once per checkpoint, not per state modification.
> > This>
> > > > > allows to “squash” updates to the same keys>
> > > > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > > > > 3. No need for sorting and compaction, no IO amplification and JNI
> > > > overhead>
> > > > > This can potentially give higher throughput and efficiency.>
> > > > >
> > > > > However, Heap backend currently lacks incremental checkpoints. This
> > > > FLIP>
> > > > > aims to add initial support for them.>
> > > > >
> > > > > [1]>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend
> > > >
> > > >
> > > > >
> > > > >
> > > > > Any feedback highly appreciated.>
> > > > >
> > > > > Regards,>
> > > > > Roman>
> > > > >
> > >
> >
>

Reply via email to