[ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33522:
---------------------------------------
    Description: 
Under certain circumstances, savepoint creation can succeed but the job fails 
afterwards. One example is when there are messages being distributed by the 
source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear yet how to solve the issue.

After the savepoint succeeded Flink fails the job like this:
{noformat}
Source (1/2) 
(cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
{noformat}
{noformat}
An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
targetTask: Source (1/2) - execution #0
Caused by:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
is not running, but in state FINISHED
   at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
   at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
{noformat}

Inside the operator this is processed as:

{noformat}
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
 A savepoint has been created at: s3://..., but the corresponding job 
1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
consistent, but might have uncommitted transactions. If you want to commit the 
transaction please restart a job from this savepoint. 

          
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
          
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
          
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
 
          
org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
 
          
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
 
          
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
 
         
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
          
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
 
          
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
 
          
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
 
          
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
          
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
 
          
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
 
          
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
 
          
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
          
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
          java.lang.Thread.run(Thread.java:829) 
{noformat}

Subsequently we get the following because HA metadata is not available anymore. 
It has been cleared up after the terminal job failure:

{noformat}
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. 
{noformat}

The deployment needs to be manually restored from a savepoint.

  was:
Under certain circumstances, savepoint creation can succeed but the job fails 
afterwards. One example is when there are messages being distributed by the 
source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear how to solve this issue.

After the savepoint succeeded Flink fails the job like this:
{noformat}
Source (1/2) 
(cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
{noformat}
{noformat}
An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
targetTask: Source (1/2) - execution #0
Caused by:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
is not running, but in state FINISHED
   at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
   at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
{noformat}

Inside the operator this is processed as:

{noformat}
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
 A savepoint has been created at: s3://..., but the corresponding job 
1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
consistent, but might have uncommitted transactions. If you want to commit the 
transaction please restart a job from this savepoint. 

          
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
          
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
          
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
 
          
org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
 
          
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
 
          
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
 
         
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
          
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
 
          
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
 
          
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
 
          
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
          
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
 
          
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
 
          
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
 
          
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
 
          
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
          
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
          java.lang.Thread.run(Thread.java:829) 
{noformat}

Subsequently we get the following because HA metadata is not available anymore. 
It has been cleared up after the terminal job failure:

{noformat}
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. 
{noformat}

The deployment needs to be manually restored from a savepoint.


> Savepoint upgrade mode fails despite the savepoint succeeding
> -------------------------------------------------------------
>
>                 Key: FLINK-33522
>                 URL: https://issues.apache.org/jira/browse/FLINK-33522
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kubernetes-operator-1.7.0
>
>
> Under certain circumstances, savepoint creation can succeed but the job fails 
> afterwards. One example is when there are messages being distributed by the 
> source coordinator to finished tasks. This is possibly a Flink bug although 
> it's not clear yet how to solve the issue.
> After the savepoint succeeded Flink fails the job like this:
> {noformat}
> Source (1/2) 
> (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
> switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
> {noformat}
> {noformat}
> An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
> task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
> targetTask: Source (1/2) - execution #0
> Caused by:
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>    at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
>    at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
> {noformat}
> Inside the operator this is processed as:
> {noformat}
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
>  A savepoint has been created at: s3://..., but the corresponding job 
> 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
> consistent, but might have uncommitted transactions. If you want to commit 
> the transaction please restart a job from this savepoint. 
>           
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
>           
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
>           
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
>  
>           
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
>  
>           
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
>  
>           
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
>  
>          
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
>           
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
>  
>           
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>  
>           
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>  
>           
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>  
>           
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>  
>           
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>  
>           
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>  
>           
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>  
>           
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>  
>           
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>  
>           
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>  
>           
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
>           
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  
>           java.lang.Thread.run(Thread.java:829) 
> {noformat}
> Subsequently we get the following because HA metadata is not available 
> anymore. It has been cleared up after the terminal job failure:
> {noformat}
> org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
>  metadata not available to restore from last state. It is possible that the 
> job has finished or terminally failed, or the configmaps have been deleted. 
> {noformat}
> The deployment needs to be manually restored from a savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to