Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2608#discussion_r82971228
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -282,29 +279,71 @@ public boolean isShutdown() {
        //  Handling checkpoints and messages
        // 
--------------------------------------------------------------------------------------------
     
    -   public Future<String> triggerSavepoint(long timestamp) throws Exception 
{
    -           CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
    +   /**
    +    * Triggers a savepoint with the default savepoint directory as a 
target.
    +    *
    +    * @param timestamp The timestamp for the savepoint.
    +    * @return A future to the completed checkpoint
    +    * @throws IllegalStateException If no default savepoint directory has 
been configured
    +    * @throws Exception Failures during triggering are forwarded
    +    */
    +   public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) 
throws Exception {
    +           return triggerSavepoint(timestamp, null);
    +   }
    +
    +   /**
    +    * Triggers a savepoint with the given savepoint directory as a target.
    +    *
    +    * @param timestamp The timestamp for the savepoint.
    +    * @param savepointDirectory Target directory for the savepoint.
    +    * @return A future to the completed checkpoint
    +    * @throws IllegalStateException If no savepoint directory has been
    +    *                               specified and no default savepoint 
directory has been
    +    *                               configured
    +    * @throws Exception             Failures during triggering are 
forwarded
    +    */
    +   public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
    +           String targetDirectory;
    +           if (savepointDirectory != null) {
    +                   targetDirectory = savepointDirectory;
    +           } else if (this.savepointDirectory != null) {
    +                   targetDirectory = this.savepointDirectory;
    +           } else {
    +                   throw new IllegalStateException("No savepoint directory 
configured. " +
    +                                   "You can either specify a directory 
when triggering this savepoint or " +
    +                                   "configure a cluster-wide default via 
key '" +
    +                                   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
    +           }
    +
    +           CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
    +           CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
     
                if (result.isSuccess()) {
    -                   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
    -                   return savepoint.getCompletionFuture();
    -           }
    -           else {
    -                   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
    +                   return 
result.getPendingCheckpoint().getCompletionFuture();
    +           } else {
    +                   CompletableFuture<CompletedCheckpoint> failed = new 
FlinkCompletableFuture<>();
    --- End diff --
    
    I was looking for that one, but didn't find it. Thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to