qinf commented on code in PR #1003:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1003#discussion_r2255715719
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -560,6 +560,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId,
Configuration conf) {
&& e.getMessage().contains("Checkpointing has not been
enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
return Optional.empty();
+ } else if (e instanceof ExecutionException
+ && e.getMessage() != null
+ && e.getMessage()
+ .contains(String.format("Job %s not found",
jobId.toString()))) {
+ LOG.warn("Job {} not found", jobId, e);
Review Comment:
@1996fanrui Thanks for the quick feedback, I have added the background. I
also add a example here.
For FINISHED job `6f4257aaf163d19a6fa1519479795c11`:
1. when request `GET /jobs/:jobid` -> `GET
/jobs/6f4257aaf163d19a6fa1519479795c11`, the response is:
```json
{
"jid": "6f4257aaf163d19a6fa1519479795c11",
"name": "app442315instance1204699",
"isStoppable": false,
"state": "FINISHED",
"start-time": 1754445330065,
"end-time": 1754445366256,
"duration": 36191,
"maxParallelism": -1,
"now": 1754445409626,
"timestamps": {
"CREATED": 1754445338012,
"RUNNING": 1754445338616,
"CANCELED": 0,
"INITIALIZING": 1754445330065,
"RECONCILING": 0,
"FINISHED": 1754445366256,
"FAILED": 0,
"CANCELLING": 0,
"FAILING": 0,
"RESTARTING": 0,
"SUSPENDED": 0
},
......
}
```
2. when request `GET /jobs/:jobid/checkpoints` -> `GET
/jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints`, the response is:
```json
{
"errors": [
"org.apache.flink.runtime.rest.NotFoundException: Job
6f4257aaf163d19a6fa1519479795c11 not found\n\tat
org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat
..."
}
```
Maybe the current solution catching the exception in `getLastCheckpoint()`
is somewhat unintuitive here. Another solution is catching the exception `Job
not found` in `observeLatestCheckpoint()`
```Java
private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String
jobId) {
var status = ctx.getResource().getStatus();
var jobStatus = status.getJobStatus();
ctx.getFlinkService()
.getLastCheckpoint(JobID.fromHexString(jobId),
ctx.getObserveConfig())
.ifPresentOrElse(
snapshot ->
jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
() -> {
if (ReconciliationUtils.isJobCancelled(status)) {
// For cancelled jobs the observed savepoint
is always definite,
// so if empty we know the job doesn't have
any
// checkpoints/savepoints
jobStatus.setUpgradeSavepointPath(null);
}
});
}
```
And can check the job is FINISHED by
```Java
JobStatus.FINISHED == resource.getStatus().getJobStatus().getState();
```
Do you have any suggestions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]