Re: K8s operator - Stop Job with savepoint on session cluster via Java API
Hi Krzysztof, Please set upgradeMode to savepoint and change state from Running to Suspended on your application. This makes it so that you trigger an upgrade (as at least the job state changes) and for the upgrade we explicitly trigger a savepoint as you choose that for the upgrade mode. Importantly you do not need to use savepointTriggerNonce for this mechanism at all, also whether you run in a session mode or application mode is of secondary concern. Savepoints triggered by default are canonical, that is provided by this mechanism. The configurations for the savepoint are taken from the Flink configuration an the application level (potentially inherited by operator/cluster level defaults), you cannot pass them in the spec as of today. Best, Marton On Fri, Sep 1, 2023 at 9:31 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi thanks, > However what you have send me is sql client. I'm looking for a way to do > it via k8s operator's java Api. > > pt., 1 wrz 2023, 03:58 użytkownik Shammon FY napisał: > >> Hi Krzysztof, >> >> For the flink session cluster, you can stop the job with savepoint >> through the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can >> refer to [1] for more information about how to do it in sql client and you >> can also create a table environment to perform the statement in your >> application. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job >> >> Best, >> Shammon FY >> >> On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski < >> krzysiek.chmielew...@gmail.com> wrote: >> >>> Hi community, >>> I would like to ask what is the recommended way to stop Flink job with >>> save point on a session cluster via k8s operator Java API? >>> >>> Currently I'm doing this by setting savepointTriggerNonce on JobSpec >>> object. >>> However I've noticed that this works only if I do not include Job state >>> change in that spec. >>> >>> In other words when I submit JobSpec that has state change from Running >>> to Suspend and savepointTriggerNonce, the checkpoint is not created. Is >>> that intended? >>> In order to mimic [1] do I have to submit two JobSpec updates? One with >>> savepointNonce and the second one with Job state change? >>> >>> A followup question, what kind of savepoint is triggered when using >>> savepointTriggerNonce native or canonical? Also is there a way to pass >>> --drain option or savepoint path via spec? (Not >>> including state.savepoints.dir cluster config option) >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint >>> >>> Thanks, >>> Krzysztof Chmielewski >>> >>
Re: K8s operator - Stop Job with savepoint on session cluster via Java API
Hi thanks, However what you have send me is sql client. I'm looking for a way to do it via k8s operator's java Api. pt., 1 wrz 2023, 03:58 użytkownik Shammon FY napisał: > Hi Krzysztof, > > For the flink session cluster, you can stop the job with savepoint through > the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to > [1] for more information about how to do it in sql client and you can also > create a table environment to perform the statement in your application. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job > > Best, > Shammon FY > > On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > >> Hi community, >> I would like to ask what is the recommended way to stop Flink job with >> save point on a session cluster via k8s operator Java API? >> >> Currently I'm doing this by setting savepointTriggerNonce on JobSpec >> object. >> However I've noticed that this works only if I do not include Job state >> change in that spec. >> >> In other words when I submit JobSpec that has state change from Running >> to Suspend and savepointTriggerNonce, the checkpoint is not created. Is >> that intended? >> In order to mimic [1] do I have to submit two JobSpec updates? One with >> savepointNonce and the second one with Job state change? >> >> A followup question, what kind of savepoint is triggered when using >> savepointTriggerNonce native or canonical? Also is there a way to pass >> --drain option or savepoint path via spec? (Not >> including state.savepoints.dir cluster config option) >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint >> >> Thanks, >> Krzysztof Chmielewski >> >
Re: K8s operator - Stop Job with savepoint on session cluster via Java API
Hi Krzysztof, For the flink session cluster, you can stop the job with savepoint through the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to [1] for more information about how to do it in sql client and you can also create a table environment to perform the statement in your application. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job Best, Shammon FY On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi community, > I would like to ask what is the recommended way to stop Flink job with > save point on a session cluster via k8s operator Java API? > > Currently I'm doing this by setting savepointTriggerNonce on JobSpec > object. > However I've noticed that this works only if I do not include Job state > change in that spec. > > In other words when I submit JobSpec that has state change from Running to > Suspend and savepointTriggerNonce, the checkpoint is not created. Is that > intended? > In order to mimic [1] do I have to submit two JobSpec updates? One with > savepointNonce and the second one with Job state change? > > A followup question, what kind of savepoint is triggered when using > savepointTriggerNonce native or canonical? Also is there a way to pass > --drain option or savepoint path via spec? (Not > including state.savepoints.dir cluster config option) > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint > > Thanks, > Krzysztof Chmielewski >
K8s operator - Stop Job with savepoint on session cluster via Java API
Hi community, I would like to ask what is the recommended way to stop Flink job with save point on a session cluster via k8s operator Java API? Currently I'm doing this by setting savepointTriggerNonce on JobSpec object. However I've noticed that this works only if I do not include Job state change in that spec. In other words when I submit JobSpec that has state change from Running to Suspend and savepointTriggerNonce, the checkpoint is not created. Is that intended? In order to mimic [1] do I have to submit two JobSpec updates? One with savepointNonce and the second one with Job state change? A followup question, what kind of savepoint is triggered when using savepointTriggerNonce native or canonical? Also is there a way to pass --drain option or savepoint path via spec? (Not including state.savepoints.dir cluster config option) [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint Thanks, Krzysztof Chmielewski
Re: stop job with Savepoint
Hi Alexey, The list looks complete to me. Please report back if this is not correct. On Sat, Feb 20, 2021 at 11:30 PM Alexey Trenikhun wrote: > Adding "list" to verbs helps, do I need to add anything else ? > > -- > *From:* Alexey Trenikhun > *Sent:* Saturday, February 20, 2021 2:10 PM > *To:* Flink User Mail List > *Subject:* stop job with Savepoint > > Hello, > I'm running per job Flink cluster, JM is deployed as Kubernetes Job > with restartPolicy: Never, highavailability is KubernetesHaServicesFactory. > Job runs fine for some time, configmaps are created etc. Now in order to > upgrade Flink job, I'm trying to stop job with savepoint (flink > stop $JOB_ID), JM exits with code 2, from log: > > *{"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster > entrypoint process StandaloneApplicationClusterEntryPoint with exit code > 2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > GET at: > https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes > <https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes>. > Message: Forbidden!Configured service account doesn't have access. Service > account may have been revoked. configmaps is forbidden: User > \"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API > group \"\" in the namespace \"n\".\n\tat > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat > org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat > org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat > org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat > org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat > org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat > java.util.concurrent.CompletableFuture$Completion
Re: stop job with Savepoint
Adding "list" to verbs helps, do I need to add anything else ? From: Alexey Trenikhun Sent: Saturday, February 20, 2021 2:10 PM To: Flink User Mail List Subject: stop job with Savepoint Hello, I'm running per job Flink cluster, JM is deployed as Kubernetes Job with restartPolicy: Never, highavailability is KubernetesHaServicesFactory. Job runs fine for some time, configmaps are created etc. Now in order to upgrade Flink job, I'm trying to stop job with savepoint (flink stop $JOB_ID), JM exits with code 2, from log: {"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster entrypoint process StandaloneApplicationClusterEntryPoint with exit code 2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps is forbidden: User \"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API group \"\" in the namespace \"n\".\n\tat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes. Message: Forbid
stop job with Savepoint
Hello, I'm running per job Flink cluster, JM is deployed as Kubernetes Job with restartPolicy: Never, highavailability is KubernetesHaServicesFactory. Job runs fine for some time, configmaps are created etc. Now in order to upgrade Flink job, I'm trying to stop job with savepoint (flink stop $JOB_ID), JM exits with code 2, from log: {"ts":"2021-02-20T21:34:18.195Z","message":"Terminating cluster entrypoint process StandaloneApplicationClusterEntryPoint with exit code 2.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2,"stack_trace":"java.util.concurrent.ExecutionException: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps is forbidden: User \"system:serviceaccount:n:fsp\" cannot list resource \"configmaps\" in API group \"\" in the namespace \"n\".\n\tat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)\n\tat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat org.apache.flink.kubernetes.highavailability.KubernetesHaServices.internalCleanup(KubernetesHaServices.java:142)\n\tat org.apache.flink.runtime.highavailability.AbstractHaServices.closeAndCleanupAllData(AbstractHaServices.java:180)\n\tat org.apache.flink.runtime.entrypoint.ClusterEntrypoint.stopClusterServices(ClusterEntrypoint.java:378)\n\tat org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$shutDownAsync$3(ClusterEntrypoint.java:467)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$composeAfterwards$19(FutureUtils.java:704)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$null$18(FutureUtils.java:715)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent.lambda$closeAsyncInternal$3(DispatcherResourceManagerComponent.java:182)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils$CompletionConjunctFuture.completeFuture(FutureUtils.java:956)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)\n\tat org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$22(FutureUtils.java:1323)\n\tat java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)\n\tat java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.96.0.1/api/v1/namespaces/n/configmaps?labelSelector=app%3Dfsp%2Cconfigmap-type%3Dhigh-availability%2Ctype%3Dflink-native-kubernetes. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. configmaps is forbidden: User \"system:serviceaccount:n:fsp\" cannot list resource \&quo
Re: Stop job with savepoint during graceful shutdown on a k8s cluster
For point (1) above: Its up to user to have proper sink and source to choose to have exactly once semantics as per the documentation: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/guarantees.html If we choose the supported source and sink combinations duplicates will be avoided. For point (2) If the communication breaks across Job manager and task manager during the save point or checkpoint operation, checkpoint/save point will be declined. We can't have them Regards Bhaskar On Sat, Mar 14, 2020 at 4:54 PM shravan wrote: > Our understanding is to stop job with savepoint, all the task manager > will persist their state during savepoint. If a Task Manager receives a > shutdown signal while savepoint is being taken, does it complete the > savepoint before shutdown ? > [Ans ] Why task manager is shutdown suddenly? Are you saying about handling > unpredictable shutdown while taking > savepoint? In that case You can also use retained check point. In case > current checkpoint has issues because of shutdown > you will have previous checkpoint. So that you can use it. Now you will > have > 2 options, either savepoint/checkpoint. One of them > will always be available. > *[Followup Question]* When the processing service is shutdown say for > maintenance as it is a graceful shutdown we are looking at means to avoid > duplicates as exactly once message processing is guaranteed by our service > . > We are already starting the job based on checkpoint or savepoint whichever > is the latest. When the job is started from last good checkpoint it results > in duplicates. > > The job manager K8S service is configured as remote job manager address > in Task Manager. This service may not be available during savepoint, will > this affect the communication between Task Manager and Job Manager during > savepoint ? > [Ans] you can go for HA right? Where you can run more than one jobmanager > so > that one is always service is available > *[Followup Question]* As we mentioned above processing service is shut down > for maintenance. > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Stop job with savepoint during graceful shutdown on a k8s cluster
Our understanding is to stop job with savepoint, all the task manager will persist their state during savepoint. If a Task Manager receives a shutdown signal while savepoint is being taken, does it complete the savepoint before shutdown ? [Ans ] Why task manager is shutdown suddenly? Are you saying about handling unpredictable shutdown while taking savepoint? In that case You can also use retained check point. In case current checkpoint has issues because of shutdown you will have previous checkpoint. So that you can use it. Now you will have 2 options, either savepoint/checkpoint. One of them will always be available. *[Followup Question]* When the processing service is shutdown say for maintenance as it is a graceful shutdown we are looking at means to avoid duplicates as exactly once message processing is guaranteed by our service . We are already starting the job based on checkpoint or savepoint whichever is the latest. When the job is started from last good checkpoint it results in duplicates. The job manager K8S service is configured as remote job manager address in Task Manager. This service may not be available during savepoint, will this affect the communication between Task Manager and Job Manager during savepoint ? [Ans] you can go for HA right? Where you can run more than one jobmanager so that one is always service is available *[Followup Question]* As we mentioned above processing service is shut down for maintenance. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Stop job with savepoint during graceful shutdown on a k8s cluster
Please find answers inline Our understanding is to stop job with savepoint, all the task manager will persist their state during savepoint. If a Task Manager receives a shutdown signal while savepoint is being taken, does it complete the savepoint before shutdown ? [Ans ] Why task manager is shutdown suddenly? Are you saying about handling unpredictable shutdown while taking savepoint? In that case You can also use retained check point. In case current checkpoint has issues because of shutdown you will have previous checkpoint. So that you can use it. Now you will have 2 options, either savepoint/checkpoint. One of them will always be available. The job manager K8S service is configured as remote job manager address in Task Manager. This service may not be available during savepoint, will this affect the communication between Task Manager and Job Manager during savepoint ? [Ans] you can go for HA right? Where you can run more than one jobmanager so that one is always service is available On Fri, Mar 13, 2020 at 2:40 PM shravan wrote: > Job Manager , Task Manager are run as separate pods within K8S cluster in > our setup. As job cluster is not used, job jars are not part of Job Manager > docker image. The job is submitted from a different Flink client pod. Flink > is configured with RocksDB state backend. The docker images are created by > us as the base OS image needs to be compliant to our organization > guidelines. > > We are looking for a reliable approach to stop the job with savepoint > during > graceful shutdown to avoid duplicates on restart. > The Job Manager pod traps shutdown signal and stops all the jobs with > savepoints. The Flink client pod starts the job with savepoint on restart > of > client pod. But as the order in which pods will be shutdown is not > predictable, we have following queries, > 1. Our understanding is to stop job with savepoint, all the task > manager > will persist their state during savepoint. If a Task Manager receives a > shutdown signal while savepoint is being taken, does it complete the > savepoint before shutdown ? > 2. The job manager K8S service is configured as remote job manager > address > in Task Manager. This service may not be available during savepoint, will > this affect the communication between Task Manager and Job Manager during > savepoint ? > > Can you provide some pointers on the internals of savepoint in Flink ? > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Stop job with savepoint during graceful shutdown on a k8s cluster
Job Manager , Task Manager are run as separate pods within K8S cluster in our setup. As job cluster is not used, job jars are not part of Job Manager docker image. The job is submitted from a different Flink client pod. Flink is configured with RocksDB state backend. The docker images are created by us as the base OS image needs to be compliant to our organization guidelines. We are looking for a reliable approach to stop the job with savepoint during graceful shutdown to avoid duplicates on restart. The Job Manager pod traps shutdown signal and stops all the jobs with savepoints. The Flink client pod starts the job with savepoint on restart of client pod. But as the order in which pods will be shutdown is not predictable, we have following queries, 1. Our understanding is to stop job with savepoint, all the task manager will persist their state during savepoint. If a Task Manager receives a shutdown signal while savepoint is being taken, does it complete the savepoint before shutdown ? 2. The job manager K8S service is configured as remote job manager address in Task Manager. This service may not be available during savepoint, will this affect the communication between Task Manager and Job Manager during savepoint ? Can you provide some pointers on the internals of savepoint in Flink ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/