Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5560#discussion_r170300254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -534,42 +536,94 @@ public void postStop() throws Exception { // 4. take a savepoint final CompletableFuture<String> savepointFuture = triggerSavepoint( - jobMasterConfiguration.getTmpDirectory(), - timeout); + null, + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; --- End diff -- I think `savepointToDispose` be `null`.
---