[ 
https://issues.apache.org/jira/browse/FLINK-37483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936886#comment-17936886
 ] 

Max Feng commented on FLINK-37483:
----------------------------------

Here are logs from job submission to when cleanup occurs.

These, and the earlier stacktrace, are from the initial failed state (point 2).

{code:java}
{"@timestamp":"2025-03-18T20:27:26.951Z","@version":"1","message":"Job 
00000000000000000000000000000000 is 
submitted.","logger_name":"org.apache.flink.client.deployment.application.executors.EmbeddedExecutor","thread_name":"flink-akka.actor.default-dispatcher-14","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:26.952Z","@version":"1","message":"Submitting 
Job with 
JobId=00000000000000000000000000000000.","logger_name":"org.apache.flink.client.deployment.application.executors.EmbeddedExecutor","thread_name":"flink-akka.actor.default-dispatcher-14","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:32.678Z","@version":"1","message":"Received 
JobGraph submission '<job name removed>' 
(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:32.743Z","@version":"1","message":"Submitting 
job '<job name removed>' 
(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:32.821Z","@version":"1","message":"Unable to 
find 'mime.types' file in 
classpath","logger_name":"com.amazonaws.services.s3.internal.Mimetypes","thread_name":"s3a-transfer-shared-pool1-t3","level":"WARN","level_value":30000}
{"@timestamp":"2025-03-18T20:27:33.186Z","@version":"1","message":"Added 
JobGraph(jobId: 00000000000000000000000000000000) to 
KubernetesStateHandleStore{configMapName='<job name 
removed>-cluster-config-map'}.","logger_name":"org.apache.flink.runtime.jobmanager.DefaultJobGraphStore","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.324Z","@version":"1","message":"Initializing 
job '<job name removed>' 
(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.337Z","@version":"1","message":"Using 
restart back off time strategy 
ExponentialDelayRestartBackoffTimeStrategy(initialBackoffMS=1000, 
maxBackoffMS=900000, backoffMultiplier=2.0, resetBackoffThresholdMS=600000, 
jitterFactor=0.1, currentBackoffMS=1000, lastFailureTimestamp=0) for <job name 
removed> 
(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.356Z","@version":"1","message":"Recovering 
checkpoints from KubernetesStateHandleStore{configMapName='<job name 
removed>-00000000000000000000000000000000-config-map'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.415Z","@version":"1","message":"Found 1 
checkpoints in KubernetesStateHandleStore{configMapName='<job name 
removed>-00000000000000000000000000000000-config-map'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.416Z","@version":"1","message":"Trying to 
fetch 1 checkpoints from 
storage.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.416Z","@version":"1","message":"Trying to 
retrieve checkpoint 
2134.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.638Z","@version":"1","message":"Running 
initialization on master for job <job name removed> 
(00000000000000000000000000000000).","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.639Z","@version":"1","message":"Successfully 
ran initialization on master in 0 
ms.","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.749Z","@version":"1","message":"Built 1 new 
pipelined regions in 0 ms, total 1 pipelined regions 
currently.","logger_name":"org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.754Z","@version":"1","message":"Using 
job/cluster config to configure application-defined state backend: 
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, 
writeBatchSize=2097152}","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.754Z","@version":"1","message":"Using 
predefined options: 
FLASH_SSD_OPTIMIZED.","logger_name":"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.754Z","@version":"1","message":"Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={}}.","logger_name":"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.754Z","@version":"1","message":"DefaultConfigurableOptionsFactory{configuredOptions={}}
 is extending from 
org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, 
which is deprecated and will be removed in the future. It is highly recommended 
to directly implement the ConfigurableRocksDBOptionsFactory without extending 
the org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory. 
For more information, please refer to 
FLINK-24046.","logger_name":"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend","thread_name":"jobmanager-io-thread-1","level":"WARN","level_value":30000}
{"@timestamp":"2025-03-18T20:27:33.755Z","@version":"1","message":"Using 
application-defined state backend: EmbeddedRocksDBStateBackend{, 
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, 
numberOfTransferThreads=4, 
writeBatchSize=2097152}","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.755Z","@version":"1","message":"State 
backend loader loads the state backend as 
EmbeddedRocksDBStateBackend","logger_name":"org.apache.flink.runtime.state.StateBackendLoader","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.756Z","@version":"1","message":"Using 
job/cluster config to configure application-defined checkpoint storage: 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@3b6d8e7e","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:33.756Z","@version":"1","message":"Config 
option 'state.checkpoint-storage' is ignored because the checkpoint storage 
passed via StreamExecutionEnvironment takes 
precedence.","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"jobmanager-io-thread-1","level":"WARN","level_value":30000}
{"@timestamp":"2025-03-18T20:27:34.419Z","@version":"1","message":"Restoring 
job 00000000000000000000000000000000 from Checkpoint 2134 @ 1742217822990 for 
00000000000000000000000000000000 located at s3://<path 
removed>.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"jobmanager-io-thread-1","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:34.435Z","@version":"1","message":"Job 
00000000000000000000000000000000 reached terminal state 
FAILED.\norg.apache.flink.runtime.client.JobInitializationException: Could not 
start the JobMaster.\n\tat 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1769)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat
 java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: 
java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
There is no operator for the state d844df5f5a78206dad1efc5e0318dcb5\n\tat 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat
 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1766)\n\t...
 3 more\nCaused by: java.lang.IllegalStateException: There is no operator for 
the state d844df5f5a78206dad1efc5e0318dcb5\n\tat 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733)\n\tat
 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)\n\tat
 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat
 org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)\n\tat 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)\n\tat
 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)\n\tat
 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)\n\tat
 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)\n\t...
 3 
more","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:34.840Z","@version":"1","message":"Job 
00000000000000000000000000000000 has been registered for cleanup in the 
JobResultStore after reaching a terminal 
state.","logger_name":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:34.843Z","@version":"1","message":"Stopping 
DefaultLeaderElectionService.","logger_name":"org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService","thread_name":"flink-akka.actor.default-dispatcher-18","level":"INFO","level_value":20000}
{"@timestamp":"2025-03-18T20:27:34.926Z","@version":"1","message":"Clean up the 
high availability data for job 
00000000000000000000000000000000.","logger_name":"org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices","thread_name":"cluster-io-thread-3","level":"INFO","level_value":20000}
{code}


> Native kubernetes clusters losing checkpoint state on FAILED
> ------------------------------------------------------------
>
>                 Key: FLINK-37483
>                 URL: https://issues.apache.org/jira/browse/FLINK-37483
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.20.1
>            Reporter: Max Feng
>            Priority: Major
>
> We're running Flink 1.20, native kubernetes application-mode clusters, and 
> we're running into an issue where clusters are restarting without checkpoints 
> from HA configmaps.
> To the best of our understanding, here's what's happening: 
> 1) We're running application-mode clusters in native kubernetes with 
> externalized checkpoints, retained on cancellation. We're attempting to 
> restore a job from a checkpoint; the checkpoint reference is held in the 
> Kubernetes HA configmap. 
> 2) The jobmanager encounters an issue during startup, and the job goes to 
> state FAILED.
> 3) The HA configmap containing the checkpoint reference is cleaned up.
> 4) The Kubernetes pod exits. Because it is a Kubernetes deployment, the pod 
> is immediately restarted. 
> 5) Upon restart, the new Jobmanager finds no checkpoints to restore from.
> We think this is a bad combination of the following behaviors:
> * FAILED triggers cleanup, which cleans up HA configmaps in native kubernetes 
> mode
> * FAILED does not actually stop a job in native kubernetes mode, instead it 
> is immediately retried



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to