Gyula Fora created FLINK-30268:
----------------------------------

             Summary: HA metadata and other cluster submission related errors 
should not throw DeploymentFailedException
                 Key: FLINK-30268
                 URL: https://issues.apache.org/jira/browse/FLINK-30268
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
            Reporter: Gyula Fora
            Assignee: Peter Vary
             Fix For: kubernetes-operator-1.3.0


Currently most critical cluster submission errors , and checks that validate HA 
metadata before deployment, end up throwing DeploymentFailedException.

This causes the operator to go into a weird state and actually hide the error 
in subsequent loops:


{noformat}
flink-kubernetes-operator 2022-12-01 21:55:03,978 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING  
     | The resource is being upgraded 
flink-kubernetes-operator 2022-12-01 21:55:03,992 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Info    | SUBMIT     
     | Starting deployment
flink-kubernetes-operator 2022-12-01 21:55:03,992 
o.a.f.k.o.s.AbstractFlinkService [INFO ][default/basic-checkpoint-ha-example] 
Deploying application cluster requiring last-state from HA metadata
flink-kubernetes-operator 2022-12-01 21:55:03,997 
o.a.f.k.o.c.FlinkDeploymentController 
[ERROR][default/basic-checkpoint-ha-example] Flink Deployment failed
flink-kubernetes-operator 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: HA 
metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. Manual 
restore required.
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.validateHaMetadataExists(AbstractFlinkService.java:844)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:195)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
flink-kubernetes-operator     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
flink-kubernetes-operator     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
flink-kubernetes-operator 2022-12-01 21:55:04,034 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | 
RESTOREFAILED   | HA metadata not available to restore from last state. It is 
possible that the job has finished or terminally failed, or the configmaps have 
been deleted. Manual restore required.
flink-kubernetes-operator 2022-12-01 21:55:04,034 
o.a.f.k.o.c.FlinkDeploymentController [INFO 
][default/basic-checkpoint-ha-example] End of reconciliation
flink-kubernetes-operator 2022-12-01 21:55:04,054 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING  
     | 
{"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. Manual 
restore 
required.","additionalMetadata":{"reason":"RestoreFailed"},"throwableList":[]} 
flink-kubernetes-operator 2022-12-01 21:55:19,056 
o.a.f.k.o.c.FlinkDeploymentController [INFO 
][default/basic-checkpoint-ha-example] Starting reconciliation
flink-kubernetes-operator 2022-12-01 21:55:19,058 
o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][default/basic-checkpoint-ha-example] UPGRADE change(s) detected 
(FlinkDeploymentSpec[job.state=RUNNING] differs from 
FlinkDeploymentSpec[job.state=SUSPENDED]), starting reconciliation.
flink-kubernetes-operator 2022-12-01 21:55:19,092 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Status | Info    | UPGRADING  
     | The resource is being upgraded 
flink-kubernetes-operator 2022-12-01 21:55:19,119 
o.a.f.k.o.r.d.ApplicationReconciler 
[ERROR][default/basic-checkpoint-ha-example] Invalid status for deployment: 
FlinkDeploymentStatus(super=CommonStatus(jobStatus=JobStatus(jobName=CarTopSpeedWindowingExample,
 jobId=8d5c59b7e960984cd845b9977754d2ef, state=RECONCILING, 
startTime=1669931677233, updateTime=1669931696153, 
savepointInfo=SavepointInfo(lastSavepoint=null, triggerId=null, 
triggerTimestamp=null, triggerType=null, formatType=null, savepointHistory=[], 
lastPeriodicSavepointTimestamp=0)), error=null), 
clusterInfo={flink-version=1.15.2, flink-revision=69e8126 @ 
2022-08-17T14:58:06+02:00}, jobManagerDeploymentStatus=ERROR, 
reconciliationStatus=FlinkDeploymentReconciliationStatus(super=ReconciliationStatus(reconciliationTimestamp=1669931719059,
 
lastReconciledSpec={"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/TopSpeedWindowing.jar","parallelism":2,"entryClass":null,"args":[],"state":"suspended","savepointTriggerNonce":0,"initialSavepointPath":null,"upgradeMode":"last-state","allowNonRestoredState":null},"restartNonce":2,"flinkConfiguration":{"high-availability":"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory","high-availability.storageDir":"file:///flink-data/ha","state.checkpoints.dir":"file:///flink-data/checkpoints","state.savepoints.dir":"file:///flink-data/savepoints","taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.15","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_15","ingress":null,"podTemplate":{"apiVersion":"v1","kind":"Pod","spec":{"containers":[{"name":"flink-main-container","volumeMounts":[{"mountPath":"/flink-data","name":"flink-volume"}]}],"volumes":[{"hostPath":{"path":"/tmp/flink","type":"Directory"},"name":"flink-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":5},"firstDeployment":false}},
 lastStableSpec=null, state=UPGRADING)), 
taskManager=TaskManagerInfo(labelSelector=, replicas=0))
flink-kubernetes-operator 2022-12-01 21:55:19,133 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Event  | Warning | 
CLUSTERDEPLOYMENTEXCEPTION | This indicates a bug...
flink-kubernetes-operator 2022-12-01 21:55:19,136 
o.a.f.k.o.r.ReconciliationUtils [WARN ][default/basic-checkpoint-ha-example] 
Attempt count: 0, last attempt: false
flink-kubernetes-operator 2022-12-01 21:55:19,163 o.a.f.k.o.l.AuditUtils        
 [INFO ][default/basic-checkpoint-ha-example] >>> Status | Error   | UPGRADING  
     | 
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException:
 This indicates a 
bug...","throwableList":[{"type":"java.lang.RuntimeException","message":"This 
indicates a bug..."}]} 
flink-kubernetes-operator 2022-12-01 21:55:19,164 
i.j.o.p.e.ReconciliationDispatcher [ERROR][default/basic-checkpoint-ha-example] 
Error during event processing ExecutionScope{ resource id: 
ResourceID{name='basic-checkpoint-ha-example', namespace='default'}, version: 
350553} failed.
flink-kubernetes-operator 
org.apache.flink.kubernetes.operator.exception.ReconciliationException: 
java.lang.RuntimeException: This indicates a bug...
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
flink-kubernetes-operator     at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
flink-kubernetes-operator     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
flink-kubernetes-operator     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
flink-kubernetes-operator Caused by: java.lang.RuntimeException: This indicates 
a bug...
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:180)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:60)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:210)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:142)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:161)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:62)
flink-kubernetes-operator     at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
flink-kubernetes-operator     ... 13 more
{noformat}

The main cause here is that DeploymentFailedExceptions were originally created 
so that the observer could signal a JobManager deployment failure (after it was 
submitted). Thus the error handler logic in the controller actually updates the 
jmDeploymentStatus and the job state which causes the problem.

To avoid this we should introduce a new Exception type or use something more 
suitable. We should not touch touch the jobmanagerDeploymentStatus or the 
jobstatus in most of these cases and simply retrigger the reconciliation. This 
will keep the CR in an error loop triggering warnings etc but that is expected 
in these critical failure scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to