[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819410#comment-17819410
 ] 

Alex Hoffer commented on FLINK-34451:
-------------------------------------

 
 # Here is my FlinkDeployment:


{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: kafka
  namespace: flink
spec:
  image: [redacted]
  flinkVersion: v1_18
  restartNonce: 25
  flinkConfiguration:
    taskmanager.rpc.port: "50100"
    taskmanager.numberOfTaskSlots: "1"
    blob.server.port: "6124"
    jobmanager.memory.process.size: "null"
    taskmanager.memory.process.size: "2gb"
    high-availability.type: kubernetes
    high-availability.storageDir: 
abfss://job-result-store@[redacted].dfs.core.windows.net/kafka

    state.checkpoints.dir: 
abfss://checkpoints@[redacted].dfs.core.windows.net/kafka
    execution.checkpointing.interval: "30000"
    execution.checkpointing.mode: EXACTLY_ONCE
    state.checkpoint-storage: filesystem

    state.savepoints.dir: 
abfss://savepoints@[redacted].dfs.core.windows.net/kafka

    state.backend.type: rocksdb
    state.backend.incremental: "true"
    state.backend.rocksdb.localdir: /rocksdb

    fs.azure.account.auth.type: OAuth
    fs.azure.account.oauth.provider.type: 
org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
    fs.azure.account.oauth2.client.endpoint: [redacted]
    fs.azure.account.oauth2.client.id: [redacted]

    # Fix bug with hadoop azure that buffers checkpoint blocks in disk rather 
than memory https://issues.apache.org/jira/browse/HADOOP-18707
    fs.azure.data.blocks.buffer: array

    restart-strategy.type: exponentialdelay
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 2m
    job.autoscaler.metrics.window: 1m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 1m
    job.autoscaler.catch-up.duration: 1m
    job.autoscaler.scale-up.grace-period: 10m
    jobmanager.scheduler: adaptive
    pipeline.max-parallelism: "12"
    job.autoscaler.vertex.max-parallelism: "5"

  serviceAccount: flink
  jobManager:
    replicas: 2
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - jobmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 1Gi
              requests:
                ephemeral-storage: 1Gi
  taskManager:
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - taskmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 2Gi
              requests:
                ephemeral-storage: 2Gi
            volumeMounts:
              - mountPath: /rocksdb
                name: rocksdb
        volumes:
          - name: rocksdb
            emptyDir:
              sizeLimit: 1Gi
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          ports:
            - containerPort: 9250
              name: metrics
              protocol: TCP
  job:
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/bin/python", "-py", 
"/opt/flink/usrlib/kafka-k6.py", "--kubernetes", "--fivemin_bytessent_stream", 
"--kafka_bootstrap_ip", "10.177.1.26"]
    upgradeMode: savepoint
    parallelism: 1{code}

 # Yes, I can recreate this scenario each time I try.
 # My ticket was mistaken, this was found on operator version 1.7.0 (I will 
update the ticket). I just recreated it on the latest Flink Operator image 
available (37ca517).
 # Just confirmed it occurs on Flink 1.17.0
 # Did not occur when adaptive scheduler was turned off!


In scenario 5 above, the job correctly flipped back to the last checkpoint. 
*This suggests it may be related to the adaptive scheduler setting.*

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34451
>                 URL: https://issues.apache.org/jira/browse/FLINK-34451
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.6.1
>         Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>            Reporter: Alex Hoffer
>            Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SPECCHANGED     | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SUSPENDED       | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Status | Info    | UPGRADING       | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SUBMIT          | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils         [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to