In general the Flink JobManager HA /client mechanism ensures that the rest
requests end up at the current leader.

In your case it's not clear what the actual cause of the issue was.

What I would do is to upgrade to the latest operator version (1.1.0) where
the savepoint upgrade mechanism has been hardened.
If your cluster is already stopped with the savepoint but the operator did
not get the response back you might have to perform the steps outlined in:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#manual-recovery

Savepoint upgrades work significantly more robustly with Flink 1.15+
because there we have the ability to keep the cluster/rest api around even
after the application was stopped. In Flink 1.14 and before after stopping
the job the cluster disappears, making it difficult to handle all
situations.

Side note: I think in most cases you should not need more job manager
replicas than 1. You still have the same HA guarantees with 1 replica, if
it goes down it will be restarted. The behaviour is generally the same.

Cheers,
Gyula

On Thu, Aug 11, 2022 at 11:15 AM Evgeniy Lyutikov <eblyuti...@avito.ru>
wrote:

> Hi,
>
> I'm using flink 1.14.4 with flink kubernetes operator 1.0.1 with ha
> configuration on 3 jobmanager.
>
> When trying to change the job configuration, it restarts with trigger
> savepoint and an error occurs each time:
>
>
> 2022-08-10 12:04:21,142 mo.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-namespace/job-namespace] Starting reconciliation
> 2022-08-10 12:04:21,143 mo.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Observing job status
> 2022-08-10 12:04:21,154 mo.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Job status (RUNNING) unchanged
> 2022-08-10 12:04:21,155 mo.a.f.k.o.c.FlinkConfigManager [INFO
> ][job-namespace/job-namespace] Generating new config
> 2022-08-10 12:04:21,157 mo.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Upgrading/Restarting running job, suspending
> first...
> 2022-08-10 12:04:21,157 mo.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Job is in running state, ready for upgrade
> with SAVEPOINT
> 2022-08-10 12:04:21,157 mo.a.f.k.o.s.FlinkService       [INFO
> ][job-namespace/job-namespace] Suspending job with savepoint.
> 2022-08-10 12:04:21,171 mo.a.f.k.o.r.ReconciliationUtils[WARN
> ][job-namespace/job-namespace] Attempt count: 5, last attempt: true
> 2022-08-10 12:04:21,242 mi.j.o.p.e.ReconciliationDispatcherESC[m
> ESC[1;31m[ERROR][job-namespace/job-namespace] Error during event processing
> ExecutionScope{ resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Operation not found under
> key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@b41b16a8
>
> After 5 retries
>
> 2022-08-10 12:04:21,157 o.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Job is in running state, ready for upgrade
> with SAVEPOINT
> 2022-08-10 12:04:21,157 o.a.f.k.o.s.FlinkService       [INFO
> ][job-namespace/job-namespace] Suspending job with savepoint.
> 2022-08-10 12:04:21,171 o.a.f.k.o.r.ReconciliationUtils [WARN
> ][job-namespace/job-namespace] Attempt count: 5, last attempt: true
> 2022-08-10 12:04:21,242 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][job-namespace/job-namespace] Error during event processing
> ExecutionScope{ resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Operation not found under
> key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@b41b16a8
> 2022-08-10 12:04:21,243 i.j.o.p.e.EventProcessor
>  [ERROR][job-namespace/job-namespace] Exhausted retries for ExecutionScope{
> resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null}
> 2022-08-10 12:04:53,299 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-namespace/job-namespace] Starting reconciliation
> 2022-08-10 12:04:53,299 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Observing job status
> 2022-08-10 12:05:03,322 o.a.f.s.n.i.n.c.AbstractChannel [WARN ]
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0x4fb8bb3b]
> java.util.concurrent.RejectedExecutionException: event executor terminated
> 2022-08-10 12:05:03,323 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR]
> Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
> 2022-08-10 12:05:03,323 o.a.f.k.o.o.JobStatusObserver
> [ERROR][job-namespace/job-namespace] Exception while listing jobs
> java.util.concurrent.TimeoutException
> 2022-08-10 12:05:03,324 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][job-namespace/job-namespace] Observing JobManager deployment. Previous
> status: READY
> 2022-08-10 12:05:03,324 o.a.f.k.o.o.d.ApplicationObserver
> [ERROR][job-namespace/job-namespace] Missing JobManager deployment
>
> As I suppose the problem is that the savepoint trigger request and getting
> its status are sent to different jobmanager
>
> Does the operator have a service discovery to get leader jobmanager and
> work with them?
>
>
>
> * ------------------------------ *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>

Reply via email to