[PR] [FLINK-33522] Use created savepoint even if job cancellation fails [flink-kubernetes-operator]

2023-11-10 Thread via GitHub


mxm opened a new pull request, #706:
URL: https://github.com/apache/flink-kubernetes-operator/pull/706

   
   
   Under certain circumstances, savepoint creation can succeed but the job 
fails afterwards. One example is when there are messages being distributed by 
the source coordinator to finished tasks. This is possibly a Flink bug although 
it's not clear how to solve this issue.
   
   After the savepoint succeeded Flink fails the job like this:
   
   ```
   Source (1/2) 
(cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
   ```
   
   ```
   An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
targetTask: Source (1/2) - execution #0
   Caused by:
   org.apache.flink.runtime.operators.coordination.TaskNotRunningException: 
Task is not running, but in state FINISHED
  at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
  at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
   ```
   
   Inside the operator this is processed as:
   
   ```
   java.util.concurrent.CompletionException: 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
 A savepoint has been created at: s3://..., but the corresponding job 
1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
consistent, but might have uncommitted transactions. If you want to commit the 
transaction please restart a job from this savepoint. 
   
 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
 
 
org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
 
 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
 
 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
 

org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
 
 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
 
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
 
 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) 
 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
 
 
io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) 
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
 
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
 
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
 
 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
 
 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
 
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
 java.lang.Thread.run(Thread.java:829) 
   ```
   
   Subsequently we get the following because HA metadata is not available 
anymore. It has been cleared up after the terminal job failure:
   
   ```
   
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
 metadata not available to restore from last state. It is possible that the job 
has finished or terminally failed, or the configmaps have been deleted. 
   ```
   
   The deployment needs to be manually restored from a savepoint. This PR 
allows the cluster upgrade to succeed by recovering the savepoint location info 
and preventing the upgrade process to be disrupted.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about 

Re: [PR] [FLINK-33522] Use created savepoint even if job cancellation fails [flink-kubernetes-operator]

2023-11-11 Thread via GitHub


gyfora commented on code in PR #706:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/706#discussion_r1390245416


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java:
##
@@ -673,9 +693,11 @@ public void nativeSavepointFormatTest() throws Exception {
 .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE),
 false);
 assertTrue(stopWithSavepointFuture.isDone());
-assertEquals(jobID, stopWithSavepointFuture.get().f0);
-assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
-assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+if (!failAfterSavepointCompletes) {
+assertEquals(jobID, stopWithSavepointFuture.get().f0);
+assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
+assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+}

Review Comment:
   Am I missing something here, or why are we not validating the savepoint path 
in both the failure and non failure case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33522] Use created savepoint even if job cancellation fails [flink-kubernetes-operator]

2023-11-13 Thread via GitHub


mxm commented on code in PR #706:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/706#discussion_r1390909122


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java:
##
@@ -673,9 +693,11 @@ public void nativeSavepointFormatTest() throws Exception {
 .set(OPERATOR_SAVEPOINT_FORMAT_TYPE, 
SavepointFormatType.NATIVE),
 false);
 assertTrue(stopWithSavepointFuture.isDone());
-assertEquals(jobID, stopWithSavepointFuture.get().f0);
-assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
-assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+if (!failAfterSavepointCompletes) {
+assertEquals(jobID, stopWithSavepointFuture.get().f0);
+assertEquals(SavepointFormatType.NATIVE, 
stopWithSavepointFuture.get().f1);
+assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
+}

Review Comment:
   The test asserts on the savepoint future which is an implementation detail 
which the test really shouldn't assert on. That's testing test assertions but 
not the production code itself. You are right though, that we shouldn't 
completely skip this test. I've changed the test to assert on the actual 
persisted savepoint info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33522] Use created savepoint even if job cancellation fails [flink-kubernetes-operator]

2023-11-13 Thread via GitHub


mxm merged PR #706:
URL: https://github.com/apache/flink-kubernetes-operator/pull/706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org