[ 
https://issues.apache.org/jira/browse/FLINK-27569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538829#comment-17538829
 ] 

Matthias Pohl edited comment on FLINK-27569 at 5/18/22 1:42 PM:
----------------------------------------------------------------

Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. 
Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to 
avoid the job to be restarted after a JobManager failover after the job 
terminated in an Flink cluster run in application mode. That's the exact 
use-case for which {{job-result-store.delete-on-commit}} was introduced (see 
FLINK-11813).

The application cluster failover requires a JRS entry to be around to know 
whether the job was already started and finished (and, therefore, doesn't need 
to be resubmitted). In that case, the user takes over the responsibility to 
clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it 
doesn't look like we're observing unexpected behavior here.

[~gyfora] may you confirm my observation or do I miss something?


was (Author: mapohl):
Hi [~gyfora], thanks for your patience. I was off the keyboard for two weeks. 
Thanks for jumping in [~wangyang0918]. As far as I understand, you're trying to 
avoid the job to be restarted after a JobManager failover after the job 
terminated in an Application Cluster. That's the exact use-case for which 
{{job-result-store.delete-on-commit}} was introduced (see FLINK-11813).

The application cluster failover requires a JRS entry to be around to know 
whether the job was already started and finished (and, therefore, doesn't need 
to be resubmitted). In that case, the user takes over the responsibility to 
clean up the JobResultStore entry as [~wangyang0918] pointed out. So far, it 
doesn't look like we're observing unexpected behavior here.

[~gyfora] may you confirm my observation or do I miss something?

> Terminated Flink job restarted from empty state when 
> execution.shutdown-on-application-finish is false
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27569
>                 URL: https://issues.apache.org/jira/browse/FLINK-27569
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes, Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Gyula Fora
>            Priority: Critical
>         Attachments: Screenshot 2022-05-11 at 08.46.51.png, Screenshot 
> 2022-05-11 at 08.50.03.png
>
>
> When Jobmanager HA is enabled and execution.shutdown-on-application-finish = 
> false, terminated jobs (failed, cancelled etc) will be resubmitted from a 
> compeltely empty state on jobmanager failover.
> Please see the following situation. Flink 1.15, HA enabled, shutdown on app 
> finish off:
> 1. Submit Flink application cluster
> 2. Call cancel with savepoint -> see logs below
> job succesfully finishes with savepoint
> {noformat}
> 2022-05-11 06:42:48,562 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> 00000000000000000000000000000000 reached terminal state FINISHED.
> 2022-05-11 06:42:48,624 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> 00000000000000000000000000000000 has been registered for cleanup in the 
> JobResultStore after reaching a terminal state.
> 2022-05-11 06:42:48,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Stopping the JobMaster for job 'State machine job' 
> (00000000000000000000000000000000).
> 2022-05-11 06:42:48,629 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Shutting down
> 2022-05-11 06:42:48,647 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] 
> - Shutting down.
> 2022-05-11 06:42:48,647 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] 
> - Removing counter from ConfigMap 
> basic-checkpoint-ha-example-00000000000000000000000000000000-config-map
> 2022-05-11 06:42:48,652 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Releasing slot [0cdb18eefcb2133049223214d4716fa0].
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9].
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Close ResourceManager connection 
> 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine 
> job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 
> [] - Stopping 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Stopped to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:9a1bc36b-6a76-4970-96a0-945e9a12b66d
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 
> [] - Stopping 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Stopped to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:5facec4c-d888-43b4-88d0-d1f34912d35a
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Disconnect job manager 
> 969eeac09f5cf4813103003495204...@akka.tcp://flink@172.17.0.6:6123/user/rpc/jobmanager_2
>  for job 00000000000000000000000000000000 from the resource manager.
> 2022-05-11 06:42:48,660 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2022-05-11 06:42:48,723 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices
>  [] - Clean up the high availability data for job 
> 00000000000000000000000000000000.
> 2022-05-11 06:42:48,753 INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job 
> graph 00000000000000000000000000000000 from 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,758 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices
>  [] - Finished cleaning up the high availability data for job 
> 00000000000000000000000000000000.
> 2022-05-11 06:42:50,321 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application completed SUCCESSFULLY{noformat}
> !Screenshot 2022-05-11 at 08.46.51.png|width=882,height=106!
> 3. Trigger JobManager failover
> Jobmanager recovers, but resubmits job from empty state:
> {noformat}
> 2022-05-11 06:48:04,535 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 00000000000000000000000000000000 is submitted.
> 2022-05-11 06:48:04,535 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=00000000000000000000000000000000.
> 2022-05-11 06:48:04,629 INFO  
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
> pods from previous attempts, current attempt id is 1.
> 2022-05-11 06:48:04,629 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Recovered 0 workers from previous attempt.
> 2022-05-11 06:48:04,650 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
> JobGraph submission 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,652 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting 
> job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,746 INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Added 
> JobGraph(jobId: 00000000000000000000000000000000) to 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:04,826 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Starting DefaultLeaderElectionService with 
> org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27.
> 2022-05-11 06:48:04,838 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting 
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
> akka://flink/user/rpc/jobmanager_2 .
> 2022-05-11 06:48:04,843 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Initializing job 'State machine job' 
> (00000000000000000000000000000000).
> 2022-05-11 06:48:04,926 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using restart back off time strategy 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
> backoffTimeMS=1000) for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,955 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Recovering checkpoints from 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Found 0 checkpoints in 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Trying to fetch 0 checkpoints from storage.
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Running initialization on master for job State machine job 
> (00000000000000000000000000000000).
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Successfully ran initialization on master in 0 ms.
> 2022-05-11 06:48:05,032 INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
> 2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - No state backend has been configured, using default 
> (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c
> 2022-05-11 06:48:05,035 INFO  
> org.apache.flink.runtime.state.StateBackendLoader            [] - State 
> backend loader loads the state backend as HashMapStateBackend
> 2022-05-11 06:48:05,036 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Checkpoint storage is set to 'jobmanager'
> 2022-05-11 06:48:05,053 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No 
> checkpoint found during restore.
> 2022-05-11 06:48:05,058 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using failover strategy 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f
>  for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:05,065 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Starting DefaultLeaderRetrievalService with 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:05,066 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Starting to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:83411d91-3094-46c5-b2cc-0576bf5cc161
> 2022-05-11 06:48:05,126 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Starting execution of job 'State machine job' 
> (00000000000000000000000000000000) under job master id 
> 9c63401786b3856e5c8a0cf069e44198.
> 2022-05-11 06:48:05,132 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Starting scheduling with scheduling strategy 
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> {noformat}
> !Screenshot 2022-05-11 at 08.50.03.png|width=861,height=79!
>  
> In addition, checkpoint history is also lost (which is probably the main 
> cause of the issue)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to