[
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616
]
ASF GitHub Bot commented on FLINK-8459:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5622#discussion_r171857517
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final
ResourceID resourceID) {
@Override
public CompletableFuture<String> triggerSavepoint(
- @Nullable final String targetDirectory,
- final Time timeout) {
- try {
- return executionGraph.getCheckpointCoordinator()
- .triggerSavepoint(System.currentTimeMillis(),
targetDirectory)
-
.thenApply(CompletedCheckpoint::getExternalPointer);
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
+ @Nullable final String targetDirectory,
+ final boolean cancelJob,
+ final Time timeout) {
+
+ final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator == null) {
+ return FutureUtils.completedExceptionally(new
IllegalStateException(
+ String.format("Job %s is not a streaming job.",
jobGraph.getJobID())));
+ }
+
+ if (cancelJob) {
+ checkpointCoordinator.stopCheckpointScheduler();
+ }
+ return checkpointCoordinator
+ .triggerSavepoint(System.currentTimeMillis(),
targetDirectory)
+ .thenApply(CompletedCheckpoint::getExternalPointer)
+ .thenApplyAsync(path -> {
+ if (cancelJob) {
+ log.info("Savepoint stored in {}. Now
cancelling {}.", path, jobGraph.getJobID());
+ cancel(timeout);
+ }
+ return path;
+ }, getMainThreadExecutor())
+ .exceptionally(throwable -> {
+ if (cancelJob) {
+
startCheckpointScheduler(checkpointCoordinator);
--- End diff --
If the cancelation failed, we restart the scheduler as well. I think this
differs from the previous implementation.
> Implement cancelWithSavepoint in RestClusterClient
> --------------------------------------------------
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Gary Yao
> Assignee: Gary Yao
> Priority: Blocker
> Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have
> different semantics because the checkpoint scheduler is not stopped. Thus it
> is not guaranteed that there won't be additional checkpoints between the
> savepoint and the job cancelation.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)