Re: Task Local Recovery with mountable disks in the cloud

2021-05-11 Thread Yang Wang
Just a side input, not only the persistent volume could help with keeping
the local state for the TaskManager pod, but also the ephemeral storage.

Ephemeral storage is bound to the lifecycle of TaskManager pod. And it
could be shared between different restarts of TaskManager container.


Best,
Yang

Sonam Mandal  于2021年5月11日周二 上午1:02写道:

> Hi Till,
>
> Sure, that sounds good. I'll open a FLIP for this when we start working on
> it.
>
> Thanks for the insights!
>
> Regards,
> Sonam
> --
> *From:* Till Rohrmann 
> *Sent:* Monday, May 10, 2021 2:26 AM
> *To:* Sonam Mandal 
> *Cc:* dev ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> I think it would be great to create a FLIP for this feature. FLIPs don't
> have to be super large and in this case, I could see it work to express the
> general idea to make local recovery work across TaskManager failures and
> then outline the different ideas we had so far. If we then decide to go
> with the persisting of cache information (the AllocationIDs), then this
> could be a good outcome. If we decide to go with the more complex solution
> of telling the ResourceManager and JobMaster about the ranges of cached
> state data, then this is also ok.
>
> Cheers,
> Till
>
> On Fri, May 7, 2021 at 6:30 PM Sonam Mandal  wrote:
>
> Hi Till,
>
> Thanks for getting back to me. Apologies for my delayed response.
>
> Thanks for confirming that the slot ID (Allocation ID) is indeed necessary
> today for task local recovery to kick in, and thanks for your insights on
> how to make this work.
>
> We are interested in exploring this disaggregation between local state
> storage and slots to allow potential reuse of local state even when TMs go
> down.
>
> I'm planning to spend some time exploring the Flink code around local
> recovery and state persistence. I'm still new to Flink, so any guidance
> will be helpful. I think both of your ideas on how to make this happen
> are interesting and worth exploring. What's the procedure to collaborate or
> get guidance on this feature? Will a FLIP be required, or will opening a
> ticket do?
>
> Thanks,
> Sonam
> ------
> *From:* Till Rohrmann 
> *Sent:* Monday, April 26, 2021 10:24 AM
> *To:* dev 
> *Cc:* user@flink.apache.org ; Sonam Mandal <
> soman...@linkedin.com>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> sorry for the late reply. We were a bit caught in the midst of the feature
> freeze for the next major Flink release.
>
> In general, I think it is a very good idea to disaggregate the local state
> storage to make it reusable across TaskManager failures. However, it is
> also not trivial to do.
>
> Maybe let me first describe how the current task local recovery works and
> then see how we could improve it:
>
> Flink creates for every slot allocation an AllocationID. The AllocationID
> associates a slot on a TaskExecutor with a job and is also used for scoping
> the lifetime of a slot wrt a job (theoretically, one and the same slot
> could be used to fulfill multiple slot requests of the same job if the slot
> allocation is freed in between). Note that the AllocationID is a random ID
> and, thus, changes whenever the ResourceManager allocates a new slot on a
> TaskExecutor for a job.
>
> Task local recovery is effectively a state cache which is associated with
> an AllocationID. So for every checkpoint and every task, a TaskExecutor
> copies the state data and stores them in the task local recovery cache. The
> cache is maintained as long as the slot allocation is valid (e.g. the slot
> has not been freed by the JobMaster and the slot has not timed out). This
> makes the lifecycle management of the state data quite easy and makes sure
> that a process does not clutter local disks. On the JobMaster side, Flink
> remembers for every Execution, where it is deployed (it remembers the
> AllocationID). If a failover happens, then Flink tries to re-deploy the
> Executions into the slots they were running in before by matching the
> AllocationIDs.
>
> The reason why we scoped the state cache to an AllocationID was for
> simplicity and because we couldn't guarantee that a failed TaskExecutor X
> will be restarted on the same machine again and thereby having access to
> the same local disk as before. That's also why Flink deletes the cache
> directory when a slot is freed or when the TaskExecutor is shut down
> gracefully.
>
> With persistent volumes this changes and we can make the TaskExecutors
> "stateful" in the sense that we can reus

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Sonam Mandal
Hi Till,

Sure, that sounds good. I'll open a FLIP for this when we start working on it.

Thanks for the insights!

Regards,
Sonam

From: Till Rohrmann 
Sent: Monday, May 10, 2021 2:26 AM
To: Sonam Mandal 
Cc: dev ; user@flink.apache.org 
Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

I think it would be great to create a FLIP for this feature. FLIPs don't have 
to be super large and in this case, I could see it work to express the general 
idea to make local recovery work across TaskManager failures and then outline 
the different ideas we had so far. If we then decide to go with the persisting 
of cache information (the AllocationIDs), then this could be a good outcome. If 
we decide to go with the more complex solution of telling the ResourceManager 
and JobMaster about the ranges of cached state data, then this is also ok.

Cheers,
Till

On Fri, May 7, 2021 at 6:30 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: Monday, April 26, 2021 10:24 AM
To: dev mailto:d...@flink.apache.org>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>; Sonam Mandal 
mailto:soman...@linkedin.com>>
Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Till Rohrmann
Hi Sonam,

I think it would be great to create a FLIP for this feature. FLIPs don't
have to be super large and in this case, I could see it work to express the
general idea to make local recovery work across TaskManager failures and
then outline the different ideas we had so far. If we then decide to go
with the persisting of cache information (the AllocationIDs), then this
could be a good outcome. If we decide to go with the more complex solution
of telling the ResourceManager and JobMaster about the ranges of cached
state data, then this is also ok.

Cheers,
Till

On Fri, May 7, 2021 at 6:30 PM Sonam Mandal  wrote:

> Hi Till,
>
> Thanks for getting back to me. Apologies for my delayed response.
>
> Thanks for confirming that the slot ID (Allocation ID) is indeed necessary
> today for task local recovery to kick in, and thanks for your insights on
> how to make this work.
>
> We are interested in exploring this disaggregation between local state
> storage and slots to allow potential reuse of local state even when TMs go
> down.
>
> I'm planning to spend some time exploring the Flink code around local
> recovery and state persistence. I'm still new to Flink, so any guidance
> will be helpful. I think both of your ideas on how to make this happen
> are interesting and worth exploring. What's the procedure to collaborate or
> get guidance on this feature? Will a FLIP be required, or will opening a
> ticket do?
>
> Thanks,
> Sonam
> --
> *From:* Till Rohrmann 
> *Sent:* Monday, April 26, 2021 10:24 AM
> *To:* dev 
> *Cc:* user@flink.apache.org ; Sonam Mandal <
> soman...@linkedin.com>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> sorry for the late reply. We were a bit caught in the midst of the feature
> freeze for the next major Flink release.
>
> In general, I think it is a very good idea to disaggregate the local state
> storage to make it reusable across TaskManager failures. However, it is
> also not trivial to do.
>
> Maybe let me first describe how the current task local recovery works and
> then see how we could improve it:
>
> Flink creates for every slot allocation an AllocationID. The AllocationID
> associates a slot on a TaskExecutor with a job and is also used for scoping
> the lifetime of a slot wrt a job (theoretically, one and the same slot
> could be used to fulfill multiple slot requests of the same job if the slot
> allocation is freed in between). Note that the AllocationID is a random ID
> and, thus, changes whenever the ResourceManager allocates a new slot on a
> TaskExecutor for a job.
>
> Task local recovery is effectively a state cache which is associated with
> an AllocationID. So for every checkpoint and every task, a TaskExecutor
> copies the state data and stores them in the task local recovery cache. The
> cache is maintained as long as the slot allocation is valid (e.g. the slot
> has not been freed by the JobMaster and the slot has not timed out). This
> makes the lifecycle management of the state data quite easy and makes sure
> that a process does not clutter local disks. On the JobMaster side, Flink
> remembers for every Execution, where it is deployed (it remembers the
> AllocationID). If a failover happens, then Flink tries to re-deploy the
> Executions into the slots they were running in before by matching the
> AllocationIDs.
>
> The reason why we scoped the state cache to an AllocationID was for
> simplicity and because we couldn't guarantee that a failed TaskExecutor X
> will be restarted on the same machine again and thereby having access to
> the same local disk as before. That's also why Flink deletes the cache
> directory when a slot is freed or when the TaskExecutor is shut down
> gracefully.
>
> With persistent volumes this changes and we can make the TaskExecutors
> "stateful" in the sense that we can reuse an already occupied cache. One
> rather simple idea could be to also persist the slot allocations of a
> TaskExecutor (which slot is allocated and what is its assigned
> AllocationID). This information could be used to re-initialize the
> TaskExecutor upon restart. That way, it does not have to register at the
> ResourceManager and wait for new slot allocations but could directly start
> offering its slots to the jobs it remembered. If the TaskExecutor cannot
> find the JobMasters for the respective jobs, it would then free the slots
> and clear the cache accordingly.
>
> This could work as long as the ResourceManager does not start new
> TaskExecutors whose slots could be used to recover the job. If this is a
> problem, then one needs to answer the question how long to wait for the old
> TaskExecutors to come back and r

Re: Task Local Recovery with mountable disks in the cloud

2021-05-07 Thread Sonam Mandal
Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 26, 2021 10:24 AM
To: dev 
Cc: user@flink.apache.org ; Sonam Mandal 

Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors 
whose slots could be used to recover the job. If this is a problem, then one 
needs to answer the question how long to wait for the old TaskExecutors to come 
back and reusing their local state vs. starting quickly a fresh instance but 
having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit also 
more complex would be to make the cache information explicit when registering 
the TaskExecutor at the ResourceManager and later offering slots to the 
JobMaster. For example, the TaskExecutor could tell the ResourceManager which 
states it has locally cached (it probably needs to contain key group ranges for 
every stored state) and this information could be used to decide from which 
TaskExecutor to allocate slots for a job. Similarly on the JobMaster side we 
could use this information to calculate the best mapping between Executions and 
slots. I think that mechanism could better deal with rescaling events where 
there is no perfect match between Executions and slots because of the changed 
key group ranges.

So to answer your question: There is currently no way to preserve AllocationIDs 
across restarts. However, we could use the 

Re: Task Local Recovery with mountable disks in the cloud

2021-04-26 Thread Till Rohrmann
Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state
storage to make it reusable across TaskManager failures. However, it is
also not trivial to do.

Maybe let me first describe how the current task local recovery works and
then see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID
associates a slot on a TaskExecutor with a job and is also used for scoping
the lifetime of a slot wrt a job (theoretically, one and the same slot
could be used to fulfill multiple slot requests of the same job if the slot
allocation is freed in between). Note that the AllocationID is a random ID
and, thus, changes whenever the ResourceManager allocates a new slot on a
TaskExecutor for a job.

Task local recovery is effectively a state cache which is associated with
an AllocationID. So for every checkpoint and every task, a TaskExecutor
copies the state data and stores them in the task local recovery cache. The
cache is maintained as long as the slot allocation is valid (e.g. the slot
has not been freed by the JobMaster and the slot has not timed out). This
makes the lifecycle management of the state data quite easy and makes sure
that a process does not clutter local disks. On the JobMaster side, Flink
remembers for every Execution, where it is deployed (it remembers the
AllocationID). If a failover happens, then Flink tries to re-deploy the
Executions into the slots they were running in before by matching the
AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for
simplicity and because we couldn't guarantee that a failed TaskExecutor X
will be restarted on the same machine again and thereby having access to
the same local disk as before. That's also why Flink deletes the cache
directory when a slot is freed or when the TaskExecutor is shut down
gracefully.

With persistent volumes this changes and we can make the TaskExecutors
"stateful" in the sense that we can reuse an already occupied cache. One
rather simple idea could be to also persist the slot allocations of a
TaskExecutor (which slot is allocated and what is its assigned
AllocationID). This information could be used to re-initialize the
TaskExecutor upon restart. That way, it does not have to register at the
ResourceManager and wait for new slot allocations but could directly start
offering its slots to the jobs it remembered. If the TaskExecutor cannot
find the JobMasters for the respective jobs, it would then free the slots
and clear the cache accordingly.

This could work as long as the ResourceManager does not start new
TaskExecutors whose slots could be used to recover the job. If this is a
problem, then one needs to answer the question how long to wait for the old
TaskExecutors to come back and reusing their local state vs. starting
quickly a fresh instance but having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit
also more complex would be to make the cache information explicit when
registering the TaskExecutor at the ResourceManager and later offering
slots to the JobMaster. For example, the TaskExecutor could tell the
ResourceManager which states it has locally cached (it probably needs to
contain key group ranges for every stored state) and this information could
be used to decide from which TaskExecutor to allocate slots for a job.
Similarly on the JobMaster side we could use this information to calculate
the best mapping between Executions and slots. I think that mechanism could
better deal with rescaling events where there is no perfect match between
Executions and slots because of the changed key group ranges.

So to answer your question: There is currently no way to preserve
AllocationIDs across restarts. However, we could use the persistent volume
to store this information so that we can restore it on restart of a
TaskExecutor. This could enable task local state recovery for cases where
we lose a TaskExecutor and restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen  wrote:

> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal 
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the 

Re: Task Local Recovery with mountable disks in the cloud

2021-04-21 Thread Stephan Ewen
/cc dev@flink


On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal  wrote:

> Hello,
>
> We've been experimenting with Task-local recovery using Kubernetes. We
> have a way to specify mounting the same disk across Task Manager
> restarts/deletions for when the pods get recreated. In this scenario, we
> noticed that task local recovery does not kick in (as expected based on the
> documentation).
>
> We did try to comment out the code on the shutdown path which cleaned up
> the task local directories before the pod went down / was restarted. We
> noticed that remote recovery kicked in even though the task local state was
> present. I noticed that the slot IDs changed, and was wondering if this is
> the main reason that the task local state didn't get used in this scenario?
>
> Since we're using this shared disk to store the local state across pod
> failures, would it make sense to allow keeping the task local state so that
> we can get faster recovery even for situations where the Task Manager
> itself dies? In some sense, the storage here is disaggregated from the pods
> and can potentially benefit from task local recovery. Any reason why this
> is a bad idea in general?
>
> Is there a way to preserve the slot IDs across restarts? We setup the Task
> Manager to pin the resource-id, but that didn't seem to help. My
> understanding is that the slot ID needs to be reused for task local
> recovery to kick in.
>
> Thanks,
> Sonam
>
>