Hello,

For one, excellent work on the K8s Operator, which works extremely well.
However, I am having an issue that I can't seem to resolve where setting
the upgradeMode to savepoint simply doesn't work. For example, suppose I
submit a job using FlinkDeployment like so:

job:
    entryClass: "org.apache.flink.client.python.PythonDriver
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py",
"/opt/flink/usrlib/kafka.py", "--jarfile",
"/opt/flink/plugins/hadoop-azure/hadoop-azure-3.3.6.jar"]
    upgradeMode: savepoint
    parallelism: 2

With HA, checkpointing, and savepointing configs like the following:

high-availability.type: kubernetes
high-availability.storageDir:
abfss://job-result-store@[redacted].dfs.core.windows.net/kafka
high-availability: kubernetes

state.checkpoints.dir: abfss://checkpoints@[redacted].dfs.core.windows.net/kafka

execution.checkpointing.interval: "60000"
state.backend.type: hashmap
state.checkpoint-storage: filesystem
state.savepoints.dir: abfss://savepoints@[redacted].dfs.core.windows.net/kafka

If I then let the job run for a bit, then attempt to change a config
that will trigger an upgrade, I get the following log:

  Normal   SpecChanged                 3m7s
JobManagerDeployment  UPGRADE change(s) detected (Diff:
FlinkDeploymentSpec[job.upgradeMode : stateless -> savepoint,
flinkConfiguration.restart-strategy.fixed-delay.attempts : 9 -> 10]),
starting reconciliation.
  Warning  ClusterDeploymentException  25s (x7 over 3m7s)
JobManagerDeployment  java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Task has
failed.

This problem never resolves, and it eventually hits my restart limit
and then reboots the job. Turning on DEBUG logs doesn't show much
either - it just seems it cannot reliably submit savepoints, failing
with that generic error. Note that checkpoints work perfectly well,
and I can use the last-state upgrade option.

- Alex

Reply via email to