Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state
storage to make it reusable across TaskManager failures. However, it is
also not trivial to do.

Maybe let me first describe how the current task local recovery works and
then see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID
associates a slot on a TaskExecutor with a job and is also used for scoping
the lifetime of a slot wrt a job (theoretically, one and the same slot
could be used to fulfill multiple slot requests of the same job if the slot
allocation is freed in between). Note that the AllocationID is a random ID
and, thus, changes whenever the ResourceManager allocates a new slot on a
TaskExecutor for a job.

Task local recovery is effectively a state cache which is associated with
an AllocationID. So for every checkpoint and every task, a TaskExecutor
copies the state data and stores them in the task local recovery cache. The
cache is maintained as long as the slot allocation is valid (e.g. the slot
has not been freed by the JobMaster and the slot has not timed out). This
makes the lifecycle management of the state data quite easy and makes sure
that a process does not clutter local disks. On the JobMaster side, Flink
remembers for every Execution, where it is deployed (it remembers the
AllocationID). If a failover happens, then Flink tries to re-deploy the
Executions into the slots they were running in before by matching the
AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for
simplicity and because we couldn't guarantee that a failed TaskExecutor X
will be restarted on the same machine again and thereby having access to
the same local disk as before. That's also why Flink deletes the cache
directory when a slot is freed or when the TaskExecutor is shut down
gracefully.

With persistent volumes this changes and we can make the TaskExecutors
"stateful" in the sense that we can reuse an already occupied cache. One
rather simple idea could be to also persist the slot allocations of a
TaskExecutor (which slot is allocated and what is its assigned
AllocationID). This information could be used to re-initialize the
TaskExecutor upon restart. That way, it does not have to register at the
ResourceManager and wait for new slot allocations but could directly start
offering its slots to the jobs it remembered. If the TaskExecutor cannot
find the JobMasters for the respective jobs, it would then free the slots
and clear the cache accordingly.

This could work as long as the ResourceManager does not start new
TaskExecutors whose slots could be used to recover the job. If this is a
problem, then one needs to answer the question how long to wait for the old
TaskExecutors to come back and reusing their local state vs. starting
quickly a fresh instance but having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit
also more complex would be to make the cache information explicit when
registering the TaskExecutor at the ResourceManager and later offering
slots to the JobMaster. For example, the TaskExecutor could tell the
ResourceManager which states it has locally cached (it probably needs to
contain key group ranges for every stored state) and this information could
be used to decide from which TaskExecutor to allocate slots for a job.
Similarly on the JobMaster side we could use this information to calculate
the best mapping between Executions and slots. I think that mechanism could
better deal with rescaling events where there is no perfect match between
Executions and slots because of the changed key group ranges.

So to answer your question: There is currently no way to preserve
AllocationIDs across restarts. However, we could use the persistent volume
to store this information so that we can restore it on restart of a
TaskExecutor. This could enable task local state recovery for cases where
we lose a TaskExecutor and restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen <se...@apache.org> wrote:

> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal <soman...@linkedin.com>
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the task local state didn't get used in this
> scenario?
> >
> > Since we're using this shared disk to store the local state across pod
> > failures, would it make sense to allow keeping the task local state so
> that
> > we can get faster recovery even for situations where the Task Manager
> > itself dies? In some sense, the storage here is disaggregated from the
> pods
> > and can potentially benefit from task local recovery. Any reason why this
> > is a bad idea in general?
> >
> > Is there a way to preserve the slot IDs across restarts? We setup the
> Task
> > Manager to pin the resource-id, but that didn't seem to help. My
> > understanding is that the slot ID needs to be reused for task local
> > recovery to kick in.
> >
> > Thanks,
> > Sonam
> >
> >
>

Reply via email to