[ https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819410#comment-17819410 ]
Alex Hoffer commented on FLINK-34451: ------------------------------------- # Here is my FlinkDeployment: {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: kafka namespace: flink spec: image: [redacted] flinkVersion: v1_18 restartNonce: 25 flinkConfiguration: taskmanager.rpc.port: "50100" taskmanager.numberOfTaskSlots: "1" blob.server.port: "6124" jobmanager.memory.process.size: "null" taskmanager.memory.process.size: "2gb" high-availability.type: kubernetes high-availability.storageDir: abfss://job-result-store@[redacted].dfs.core.windows.net/kafka state.checkpoints.dir: abfss://checkpoints@[redacted].dfs.core.windows.net/kafka execution.checkpointing.interval: "30000" execution.checkpointing.mode: EXACTLY_ONCE state.checkpoint-storage: filesystem state.savepoints.dir: abfss://savepoints@[redacted].dfs.core.windows.net/kafka state.backend.type: rocksdb state.backend.incremental: "true" state.backend.rocksdb.localdir: /rocksdb fs.azure.account.auth.type: OAuth fs.azure.account.oauth.provider.type: org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider fs.azure.account.oauth2.client.endpoint: [redacted] fs.azure.account.oauth2.client.id: [redacted] # Fix bug with hadoop azure that buffers checkpoint blocks in disk rather than memory https://issues.apache.org/jira/browse/HADOOP-18707 fs.azure.data.blocks.buffer: array restart-strategy.type: exponentialdelay job.autoscaler.enabled: "true" job.autoscaler.stabilization.interval: 2m job.autoscaler.metrics.window: 1m job.autoscaler.target.utilization: "0.6" job.autoscaler.target.utilization.boundary: "0.2" job.autoscaler.restart.time: 1m job.autoscaler.catch-up.duration: 1m job.autoscaler.scale-up.grace-period: 10m jobmanager.scheduler: adaptive pipeline.max-parallelism: "12" job.autoscaler.vertex.max-parallelism: "5" serviceAccount: flink jobManager: replicas: 2 resource: memory: "2gb" cpu: 1 podTemplate: spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - kafka - key: component operator: In values: - jobmanager topologyKey: failure-domain.beta.kubernetes.io/zone weight: 10 containers: - name: flink-main-container resources: limits: ephemeral-storage: 1Gi requests: ephemeral-storage: 1Gi taskManager: resource: memory: "2gb" cpu: 1 podTemplate: spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - kafka - key: component operator: In values: - taskmanager topologyKey: failure-domain.beta.kubernetes.io/zone weight: 10 containers: - name: flink-main-container resources: limits: ephemeral-storage: 2Gi requests: ephemeral-storage: 2Gi volumeMounts: - mountPath: /rocksdb name: rocksdb volumes: - name: rocksdb emptyDir: sizeLimit: 1Gi podTemplate: spec: containers: - name: flink-main-container ports: - containerPort: 9250 name: metrics protocol: TCP job: entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/bin/python", "-py", "/opt/flink/usrlib/kafka-k6.py", "--kubernetes", "--fivemin_bytessent_stream", "--kafka_bootstrap_ip", "10.177.1.26"] upgradeMode: savepoint parallelism: 1{code} # Yes, I can recreate this scenario each time I try. # My ticket was mistaken, this was found on operator version 1.7.0 (I will update the ticket). I just recreated it on the latest Flink Operator image available (37ca517). # Just confirmed it occurs on Flink 1.17.0 # Did not occur when adaptive scheduler was turned off! In scenario 5 above, the job correctly flipped back to the last checkpoint. *This suggests it may be related to the adaptive scheduler setting.* > [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading > fallback approach > ---------------------------------------------------------------------------------------------- > > Key: FLINK-34451 > URL: https://issues.apache.org/jira/browse/FLINK-34451 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator > Affects Versions: kubernetes-operator-1.6.1 > Environment: Operator version: 1.6.1 > Flink version 1.18.0 > HA JobManagers > Adaptive scheduler mode using the operator's autoscaler > Checkpointing at an interval of 60s > Upgrade mode savepoint > Reporter: Alex Hoffer > Priority: Major > > > We had a situation where TaskManagers were constantly restarting from OOM. > We're using the Adaptive scheduler with the Kubernetes Operator, and a > restart strategy of exponential backoff, and so the JobManagers remained > alive. We're also using savepoint upgrade mode. > When we tried to remedy the situation by raising the direct memory allocation > to the pods, we were surprised that Flink used the last savepoint taken, > rather than the checkpoint. This was unfortunate for us because we are on > adaptive scheduler and the job hasn't changed in some time, so this last > savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up > until failure. I can confirm the HA metadata existed in the configmaps, and > the corresponding checkpoints existed in remote storage for it to access. > Plus, no Flink version changes were in the deployment. > The Operator logs reported that it was using last-state recovery in this > situation: > {code:java} > 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> > Event | Info | SPECCHANGED | UPGRADE change(s) detected (Diff: > FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : > null -> 100]), starting reconciliation. > 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] > Upgrading/Restarting running job, suspending first... > 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] > Job is not running but HA metadata is available for last state restore, ready > for upgrade > 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> > Event | Info | SUSPENDED | Suspending existing deployment. > 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] > Deleting JobManager deployment while preserving HA metadata. > 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> > Status | Info | UPGRADING | The resource is being upgraded > 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils [INFO ][job-name] >>> > Event | Info | SUBMIT | Starting deployment > 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] > Deploying application cluster requiring last-state from HA metadata > 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils [INFO ][job-name] Job > graph in ConfigMap job-name-cluster-config-map is deleted {code} > But when the job booted up, it reported restoring from savepoint: > {code:java} > 2024-02-15 19:39:03,887 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring > job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for > 522b3c363499d81ed7922aa30b13e237 located at > abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709. > {code} > Our expectation was that the Operator logs were true, and that it would be > restoring from checkpoint. We had to scramble and manually restore from the > checkpoint to restore function. > > > It's also worth noting I can recreate this issue in a testing environment. > The process for doing so is: > - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, > using adaptive scheduler > - Make a dummy change to trigger a savepoint. > - Allow the TaskManagers to process some data and hit the checkpoint interval. > - Cause the TaskManagers to crash. In our case, we could use up a bunch of > memory in the pods and cause it to crash. > - Observe the Operator logs saying it is restoring from last-state, but watch > as the pods instead use the last savepoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)