Hi Gyula,

After upgrading our operator version to the HEAD commit of the release-1.6
branch (
https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e),
we are still seeing this same issue.

Here's the log message on the last savepoint (log timestamp is in UTC):

2023-10-21 10:21:14,023 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 87794 for job ee4f7c678794ee16506f9b41425c244e
> (698450687 bytes, checkpointDuration=5601 ms, finalizationTime=296 ms).


4 minutes later, ConnectException occurred, and the jobmanager attempts to
restart from the last savepoint first:

2023-10-21 10:25:30,725 WARN  akka.remote.transport.netty.NettyTransport
>                 [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: /10.11.181.62:6122
> 2023-10-21 10:25:30,726 WARN  akka.remote.ReliableDeliverySupervisor
>                 [] - Association with remote system [akka.tcp://
> flink@10.11.181.62:6122] has failed, address is now gated for [50] ms.
> Reason: [Association failed with [akka.tcp://flink@10.11.181.62:6122]]
> Caused by: [java.net.ConnectException: Connection refused: /
> 10.11.181.62:6122]
> 2023-10-21 10:25:37,935 WARN
>  org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No
> hostname could be resolved for the IP address 10.11.202.152, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted.
> 2023-10-21 10:25:37,936 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
> <JOB_NAME> (ee4f7c678794ee16506f9b41425c244e) switched from state
> RESTARTING to RUNNING.
> 2023-10-21 10:25:37,936 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Restoring job ee4f7c678794ee16506f9b41425c244e from Savepoint 87794 @
> 1697883668126 for ee4f7c678794ee16506f9b41425c244e located at
> s3://<REDACTED>/savepoint-ee4f7c-9c6499126fd0.
> 2023-10-21 10:25:37,937 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
> master state to restore


However, a RecipientUnreachableException occurs, and the HA data gets
cleaned up. Eventually, the Flink cluster shuts down and restarts:


> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId,
> Time))] from sender [Actor[akka://flink/temp/taskmanager_0$ENE]] to
> recipient [Actor[akka.tcp://
> flink@10.11.181.62:6122/user/rpc/taskmanager_0#-43671188]], because the
> recipient is unreachable. This can either mean that the recipient has been
> terminated or that the remote RpcService is currently not reachable.
>         at
> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
> ~[flink-rpc-akka_61fdae14-7548-48be-b7c8-11190d636910.jar:1.14.5]
> ...
> 2023-10-21 10:25:37,946 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> Discarding the results produced by task execution
> 86d39b748d3655b6488fb9eaafb34f73.
> ...
> 2023-10-21 10:25:40,063 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data.
> ...
> 2023-10-21 10:25:40,170 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
> Terminating cluster entrypoint process
> KubernetesApplicationClusterEntrypoint with exit code 1443.
> ...
> 2023-10-21 10:25:44,631 INFO
>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
> Recovered 2 pods from previous attempts, current attempt id is 2.
> ...
> 2023-10-21 10:25:44,631 INFO
>  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 2 workers from previous attempt.
> ...
> 2023-10-21 10:25:45,015 ERROR
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] -
> Unhandled exception.
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756,
> RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender
> [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to
> recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]], because
> the recipient is unreachable. This can either mean that the recipient has
> been terminated or that the remote RpcService is currently not reachable.
> ...
> 2023-10-21 10:25:45,798 INFO
>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Creating
> highly available BLOB storage directory at
> s3://<REDACTED>/<FLINK_APPLICATION>/blob


When the Flink cluster restarts, it doesn't try to restore from the latest
savepoint anymore. Instead, it tries to restore from a savepoint in
`execution.savepoint.path` in the flink-config. Since this savepoint was
from a while ago, it has been disposed already, and so the Flink cluster
cannot restart again:

2023-10-21 10:25:47,371 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - Create KubernetesLeaderElector
> <FLINK_APPLICATION>-ee4f7c678794ee16506f9b41425c244e-jobmanager-leader with
> lock identity e29bd174-fb42-4473-ba09-f0fc3f614b34.
> ...
> 2023-10-21 10:25:47,476 INFO
>  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils
> [] - Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='<FLINK_APPLICATION>-ee4f7c678794ee16506f9b41425c244e-jobmanager-leader'}.
> ...
> 2023-10-21 10:25:47,650 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
> checkpoint found during restore.
> ...
> 2023-10-21 10:25:47,651 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting
> job ee4f7c678794ee16506f9b41425c244e from savepoint
> s3://<REDACTED>/savepoint-ee4f7c-550378a4b4d1 (allowing non restored state)
> ...
> 2023-10-21 10:25:47,703 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job
> ee4f7c678794ee16506f9b41425c244e reached terminal state FAILED.
> org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster.
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find
> checkpoint or savepoint file/directory
> 's3://<REDACTED>/savepoint-ee4f7c-550378a4b4d1' on file system 's3'.
>

Here is the execution.savepoint.path in the configmap
flink-config-<FLINK_APPLICATION>:

execution.savepoint.path: s3://<REDACTED>/savepoint-ee4f7c-550378a4b4d1


This Flink application is running on Flink 1.14

On Sat, Sep 23, 2023 at 12:15 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi
>
> Operator savepoint retention and savepoint upgrades have nothing to do
> with each other I think. Retention is only for periodic savepoints
> triggered by the operator itself.
>
> I would upgrade to the latest 1.6.0 operator version before investigating
> further.
>
> Cheers
> Gyula
>
>
> On Sat, 23 Sep 2023 at 06:02, Nathan Moderwell <
> nathan.moderw...@robinhood.com> wrote:
>
>> Small update on this. I see that the issue is that we use `upgradeMode:
>> savepoint`, but have not configured the operator to retain savepoints for
>> long enough (the previous operator we used never deleted savepoints so we
>> didn't run into this). I am reconfiguring to use `upgradeMode: last-state`
>> and enabling HA to see if this provides us more stable job restoration on
>> pod disruption.
>>
>> On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell <
>> nathan.moderw...@robinhood.com> wrote:
>>
>>> Hi flink-kubernetes-operator maintainers,
>>>
>>> We have recently migrated to the official operator and seeing a new
>>> issue where our FlinkDeployments can fail and crashloop looking for a
>>> non-existent savepoint. On further inspection, the job is attempting to
>>> restart from the savepoint specified in execution.savepoint.path. This
>>> config new for us (wasn't set by previous operator) is seems to be
>>> automatically set behind the scenes by the official operator. We see the
>>> savepoint in execution.savepoint.path existed but gets deleted after some
>>> amount of time (in the latest example, a few hours). Then when there is
>>> some pod disruption, the job attempts to restart from the savepoint (which
>>> was deleted) and starts crashlooping.
>>>
>>> Hoping you can help us troubleshoot and figure out if this can be solved
>>> through configuration (we are using equivalent configs from our previous
>>> operator where we did not have this issue). Adding some details on version
>>> and k8s state for your reference. Thank you for your support!
>>>
>>> Flink Version: 1.14.5
>>> Flink Operator Version: 1.4.0
>>>
>>> At the time of the issue, here is the flink-config we see in the
>>> configmap (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted
>>> from s3 at this point):
>>>
>>> kubernetes.jobmanager.replicas: 1
>>> jobmanager.rpc.address: <SOMETHING>
>>> metrics.scope.task:
>>> flink.taskmanager.job.<job_name>.task.<task_name>.metric
>>> kubernetes.service-account: <SOMETHING>
>>> kubernetes.cluster-id: <SOMETHING>
>>> pipeline.auto-generate-uids: false
>>> metrics.scope.tm: flink.taskmanager.metric
>>> parallelism.default: 2
>>> kubernetes.namespace: <SOMETHING>
>>> metrics.reporters: prom
>>> kubernetes.jobmanager.owner.reference: <SOMETHING>
>>> metrics.reporter.prom.port: 9090
>>> taskmanager.memory.process.size: 10G
>>> kubernetes.internal.jobmanager.entrypoint.class:
>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>>> pipeline.name: <SOMETHING>
>>> execution.savepoint.path: s3://<SOMETHING>/savepoint-bad5e5-6ab08cf0808e
>>> kubernetes.pod-template-file:
>>> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
>>> state.backend.rocksdb.localdir: /rocksdb/
>>> kubernetes.pod-template-file.taskmanager:
>>> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
>>> web.cancel.enable: false
>>> execution.checkpointing.timeout: 5 min
>>> kubernetes.container.image.pull-policy: IfNotPresent
>>> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
>>> kubernetes.jobmanager.cpu: 2.0
>>> state.backend: filesystem
>>> $internal.flink.version: v1_14
>>> kubernetes.pod-template-file.jobmanager:
>>> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml
>>> blob.server.port: 6124
>>> kubernetes.jobmanager.annotations:
>>> flinkdeployment.flink.apache.org/generation:14
>>> metrics.scope.operator:
>>> flink.taskmanager.job.<job_name>.operator.<operator_name>.metric
>>> state.savepoints.dir: s3://<SOMETHING>/savepoints
>>> kubernetes.taskmanager.cpu: 2.0
>>> execution.savepoint.ignore-unclaimed-state: true
>>> $internal.application.program-args:
>>> kubernetes.container.image: <SOMETHING>
>>> taskmanager.numberOfTaskSlots: 1
>>> metrics.scope.jm.job: flink.jobmanager.job.<job_name>.metric
>>> kubernetes.rest-service.exposed.type: ClusterIP
>>> metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>> $internal.application.main: <SOMETHING>
>>> metrics.scope.jm: flink.jobmanager.metric
>>> execution.target: kubernetes-application
>>> jobmanager.memory.process.size: 10G
>>> metrics.scope.tm.job: flink.taskmanager.job.<job_name>.metric
>>> taskmanager.rpc.port: 6122
>>> internal.cluster.execution-mode: NORMAL
>>> execution.checkpointing.externalized-checkpoint-retention:
>>> RETAIN_ON_CANCELLATION
>>> pipeline.jars: local:///build/flink/usrlib/<SOMETHING>.jar
>>> state.checkpoints.dir: s3://<SOMETHING>/checkpoints
>>>
>>> At the time of the issue, here is our FlinkDeployment Spec:
>>>
>>> Spec:
>>>   Flink Configuration:
>>>     execution.checkpointing.timeout:                  5 min
>>>     kubernetes.operator.job.restart.failed:           true
>>>     kubernetes.operator.periodic.savepoint.interval:  600s
>>>     metrics.reporter.prom.class:
>>>  org.apache.flink.metrics.prometheus.PrometheusReporter
>>>     metrics.reporter.prom.port:                       9090
>>>     metrics.reporters:                                prom
>>>     metrics.scope.jm:
>>> flink.jobmanager.metric
>>>     metrics.scope.jm.job:
>>> flink.jobmanager.job.<job_name>.metric
>>>     metrics.scope.operator:
>>> flink.taskmanager.job.<job_name>.operator.<operator_name>.metric
>>>     metrics.scope.task:
>>> flink.taskmanager.job.<job_name>.task.<task_name>.metric
>>>     metrics.scope.tm:
>>> flink.taskmanager.metric
>>>     metrics.scope.tm.job:
>>> flink.taskmanager.job.<job_name>.metric
>>>     pipeline.auto-generate-uids:                      false
>>>     pipeline.name:                                    <SOMETHING>
>>>     state.backend:                                    filesystem
>>>     state.backend.rocksdb.localdir:                   /rocksdb/
>>>     state.checkpoints.dir:
>>>  s3://<SOMETHING>/checkpoints
>>>     state.savepoints.dir:
>>> s3://<SOMETHING>/savepoints
>>>   Flink Version:                                      v1_14
>>>   Image:                                              <SOMETHING>
>>>   Image Pull Policy:                                  IfNotPresent
>>>   Job:
>>>     Allow Non Restored State:  true
>>>     Args:
>>>     Entry Class:             <SOMETHING>
>>>     Initial Savepoint Path:
>>>  s3a://<SOMETHING>/savepoint-bad5e5-577c6a76aec5
>>>     Jar URI:                 local:///build/flink/usrlib/<SOMETHING>.jar
>>>     Parallelism:             2
>>>     State:                   running
>>>     Upgrade Mode:            savepoint
>>>
>>>
>>>
>>
>> --
>>
>> <http://www.robinhood.com/>
>>
>> Nathan Moderwell
>>
>> Senior Machine Learning Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>

-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.

Reply via email to