"One customization we did was to have the job-submitter pod search for the
latest checkpoint or savepoint in S3 and then submit this information with
the Flink job to the Flink cluster"

I am aware that the Google operator does not support redeploying from last
checkpoint it always uses savepoint so you had to do this customization.
However the Flink Kubernetes Operator supports latest state upgrades /
restarts which uses the latest checkpoint/savepoint whichever was taken
last. So you can simply change the spec and the operator will upgrade with
latest state available (or take a savepoint and upgrade if you want).

" In this case, we had to delete the Flink application first, remove the
corrupted checkpoint or savepoint, and then redeploy the Flink application."

If you are concerned about checkpoint corruption, you can keep doing the
same thing with the Flink Kubernetes Operator. If you notice that some
upgrade/restart failing due to corruption, then you can simply delete the
FlinkDeployment object and re-create it by setting the
initialSavepointPath. We currently do not support changing the
initialSavepointPath for an already existing FlinkDeployment. For this you
have to delete/recreate it.
If you simply suspend , remove the checkpoint and then set to running, the
deployment will fail because it won't find the checkpoint, so you will have
to delete the FlinkDeployment to clear up any state information.

"In addition to restarting from a particular savepoint, is there a way to
restart a Flink application from the latest *checkpoint* while redeploying
the Flink application?"

If you have upgradeMode set to last-state, this is what happens always.
There is no way to upgrade/restart it to not use the last
checkpoint/savepoint :)


I feel that the Flink Operator should already cover your use cases but
there may be some semantic confusion about how it works with regards to
state handling during upgrades, spec changes etc.

Cheers,
Gyula


On Fri, Jul 21, 2023 at 5:20 PM Tony Chen <tony.ch...@robinhood.com> wrote:

> For context, we have forked the GoogleCloudPlatform operator (
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator), and we
> have customized it a bit to fit our use cases here. One customization we
> did was to have the job-submitter pod search for the latest checkpoint or
> savepoint in S3 and then submit this information with the Flink job to the
> Flink cluster.
>
> In the past we had some incidents where the checkpoint or savepoint was
> corrupted, and when the Flink application was trying to start again, it
> wasn't able to start from the latest checkpoint or savepoint. In this case,
> we had to delete the Flink application first, remove the corrupted
> checkpoint or savepoint, and then redeploy the Flink application.
>
> We are in the process of migrating to the Apache Flink Kubernetes
> Operator, and we would like to ensure that some of the customizations we
> did with the GoogleCloudPlatform operator can be performed with the Apache
> operator. I'm guessing that with the Apache operator, instead of deleting
> the Flink application, we should put the Flink application in a
> "suspended" state first, manually clean up the corrupted checkpoint or
> savepoint in S3, and then put the Flink application in a "running" state
> again?
>
> In addition to restarting from a particular savepoint, is there a way to
> restart a Flink application from the latest *checkpoint* while
> redeploying the Flink application? I was wondering if there's a field in
> the kubernetes field where I can specify which checkpoint to start from.
> For some of our applications, we complete checkpoints more often
> than savepoints, and we would like these Flink applications to always start
> from the latest checkpoint.
>
> Thanks,
> Tony
>
> On Thu, Jul 20, 2023 at 1:18 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hey!
>>
>> Please help us understand why you need to delete and recreate the
>> FlinkDeployment objects in your ecosystem. Maybe we can help suggest some
>> alternative to make your life easier :)
>>
>> Of course every prod ecosystem is unique in its own way and larger
>> platforms generally have a layer on top of the operator to manage these
>> special requirements.
>>
>> In most cases it’s possible to contribute these changes to Flink as long
>> as they fit the scope / larger development direction of the project . This
>> would require a FLIP.
>>
>> But before going there I think it’s worth talking about this
>> delete/recreate requirement because it sounds a bit strange in the
>> Kubernetes world . We specifically designed the operator in a way so that
>> you wouldn’t have to do this if you want the latest state and so far this
>> is the first I hear this ask :)
>>
>> Cheers
>> Gyula
>>
>> On Thu, 20 Jul 2023 at 00:07, Tony Chen <tony.ch...@robinhood.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Got it. Our use case might be unique to our own ecosystem here at
>>> Robinhood, so I will have to look into creating a service that can search
>>> for the latest savepoint / checkpoint in S3 and provide that to the
>>> FlinkDeployment resource.
>>>
>>> Will the Flink Community be okay with us adding this feature to the
>>> GitHub repo eventually? I was going through this guide
>>> <https://flink.apache.org/how-to-contribute/contribute-code/>, and it
>>> looks like I need to get consensus first.
>>>
>>> Thanks,
>>> Tony
>>>
>>> On Wed, Jul 19, 2023 at 4:33 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> I don’t understand why you need to delete the deployment to restart.
>>>> You can suspend, use the restartNonce or simply upgrade .
>>>>
>>>> These should cover most upgrade/restart scenarios. Like with other
>>>> resources in Kubernetes once you delete them the status is gone, so the
>>>> FlinkDeployment won’t keep the last state info.
>>>>
>>>> To keep the state after deletion you would have to introduce new
>>>> resources or an external state store. We are not planning to support this
>>>> as it goes against the standard Kubernetes resource management flow.
>>>>
>>>> I think you should look into simply suspending the job for the while or
>>>> just use a regular upgrade to fit your needs .
>>>>
>>>> Cheers
>>>> Gyula
>>>>
>>>> On Wed, 19 Jul 2023 at 22:19, Tony Chen <tony.ch...@robinhood.com>
>>>> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> Thank you for responding so quickly. I went through the page you sent
>>>>> me a bit more, and I see the following (
>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/job-management/#running-suspending-and-deleting-applications
>>>>> ):
>>>>>
>>>>> Deleting a deployment will remove all checkpoint and status
>>>>>> information. Future deployments will from an empty state unless manually
>>>>>> overridden by the user.
>>>>>>
>>>>>
>>>>> For our use case, we do delete the deployment and redeploy the Flink
>>>>> application sometimes in order to restart our Flink applications. We were
>>>>> wondering if it's possible for the operator to retain checkpoint and 
>>>>> status
>>>>> information even after the deployment gets deleted.
>>>>>
>>>>> Thanks,
>>>>> Tony
>>>>>
>>>>> On Wed, Jul 19, 2023 at 3:46 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Tony,
>>>>>>
>>>>>> Please see:
>>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>>>>>
>>>>>> The operator is made especially to handle stateful application
>>>>>> upgrades robustly. In general any spec change that you make that will 
>>>>>> lead
>>>>>> to an upgrade will be executed using the latest available / checkpoint or
>>>>>> savepoint. This is controlled by the `upgradeMode` setting for jobs, as
>>>>>> long as you have last-state or savepoint you will always get the latest
>>>>>> state.
>>>>>>
>>>>>> This is somewhat orthogonal to the savepoint trigger /
>>>>>> initialSavepointPath mechanisms. The initialSavepointPath should be used
>>>>>> only the first time the deployment is created because at that point the
>>>>>> operator is not aware of the latest state. After that all upgrades always
>>>>>> use the latest state unless the upgradeMode is stateless in which case no
>>>>>> state is used. Savepoint triggering can help you keep backups for failure
>>>>>> recovery but they should not be executed as part of your upgrade flow
>>>>>> because the operator already does this for you.
>>>>>>
>>>>>> Cheers,
>>>>>> Gyula
>>>>>>
>>>>>> On Wed, Jul 19, 2023 at 8:20 PM Tony Chen <tony.ch...@robinhood.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Flink Community,
>>>>>>>
>>>>>>> My name is Tony Chen, and I am a software engineer at Robinhood. I
>>>>>>> have some questions on restarting a Flink Application from a savepoint 
>>>>>>> or
>>>>>>> checkpoint.
>>>>>>>
>>>>>>> We currently store our checkpoints and savepoints in S3, and we
>>>>>>> would like to use the Apache Flink Kubernetes Operator to manage our 
>>>>>>> Flink
>>>>>>> applications. I know that there is a field called 
>>>>>>> "initialSavepointPath" (
>>>>>>> doc
>>>>>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#manual-recovery>)
>>>>>>> that I can set in my kubernetes manifest so that whenever I want my 
>>>>>>> Flink
>>>>>>> application to start from a particular savepoint, it will start from
>>>>>>> the savepoint directory in this field. However, if I delete this
>>>>>>> FlinkDeployment resource altogether after new savepoints were triggered,
>>>>>>> and then redeploy this FlinkDeployment resource, it looks like I have to
>>>>>>> manually update the "initialSavepointPath" to a newer savepoint 
>>>>>>> directory
>>>>>>> so that the Flink application starts from a newer savepoint.
>>>>>>>
>>>>>>> Is there a way for us to redeploy FlinkDeployment resources so that
>>>>>>> the latest checkpoint or savepoint is used, and without having to update
>>>>>>> the "initialSavepointPath" field? I noticed in my testing that whenever 
>>>>>>> I
>>>>>>> deleted the FlinkDeployment resource and redeploy, it would either start
>>>>>>> from the savepoint in initialSavepointPath or from checkpoint 1 if
>>>>>>> initialSavepointPath was not set.
>>>>>>>
>>>>>>> For example, let's say I restarted a Flink application at savepoint
>>>>>>> 10 with initialSavepointPath set to s3://savepoints/savepoint-10, and 
>>>>>>> then
>>>>>>> later on a savepoint 20 was completed and stored at
>>>>>>> s3://savepoints/savepoint-20. Is there a way for me to delete this
>>>>>>> FlinkDeployment and redeploy it without updating initialSavepointPath?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Tony
>>>>>>>
>>>>>>> P.S. I'm going through the source code more for Apache Flink
>>>>>>> Kubernetes Operator to understand how the operator starts a Flink job. 
>>>>>>> Some
>>>>>>> relevant code:
>>>>>>>
>>>>>>>    -
>>>>>>>    
>>>>>>> https://github.com/apache/flink-kubernetes-operator/blob/0c341ebe13645f4e9802cfd780c5b50f59e29363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L500
>>>>>>>    -
>>>>>>>    
>>>>>>> https://github.com/apache/flink-kubernetes-operator/blob/0c341ebe13645f4e9802cfd780c5b50f59e29363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java#L204
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> <http://www.robinhood.com/>
>>>>>>>
>>>>>>> Tony Chen
>>>>>>>
>>>>>>> Software Engineer
>>>>>>>
>>>>>>> Menlo Park, CA
>>>>>>>
>>>>>>> Don't copy, share, or use this email without permission. If you
>>>>>>> received it by accident, please let us know and then delete it right 
>>>>>>> away.
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> <http://www.robinhood.com/>
>>>>>
>>>>> Tony Chen
>>>>>
>>>>> Software Engineer
>>>>>
>>>>> Menlo Park, CA
>>>>>
>>>>> Don't copy, share, or use this email without permission. If you
>>>>> received it by accident, please let us know and then delete it right away.
>>>>>
>>>>
>>>
>>> --
>>>
>>> <http://www.robinhood.com/>
>>>
>>> Tony Chen
>>>
>>> Software Engineer
>>>
>>> Menlo Park, CA
>>>
>>> Don't copy, share, or use this email without permission. If you received
>>> it by accident, please let us know and then delete it right away.
>>>
>>
>
> --
>
> <http://www.robinhood.com/>
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>

Reply via email to