Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82971228 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // -------------------------------------------------------------------------------------------- - public Future<String> triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** + * Triggers a savepoint with the default savepoint directory as a target. + * + * @param timestamp The timestamp for the savepoint. + * @return A future to the completed checkpoint + * @throws IllegalStateException If no default savepoint directory has been configured + * @throws Exception Failures during triggering are forwarded + */ + public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** + * Triggers a savepoint with the given savepoint directory as a target. + * + * @param timestamp The timestamp for the savepoint. + * @param savepointDirectory Target directory for the savepoint. + * @return A future to the completed checkpoint + * @throws IllegalStateException If no savepoint directory has been + * specified and no default savepoint directory has been + * configured + * @throws Exception Failures during triggering are forwarded + */ + public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture<CompletedCheckpoint> failed = new FlinkCompletableFuture<>(); --- End diff -- I was looking for that one, but didn't find it. Thanks!
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---