[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565535#comment-15565535
 ] 

ASF GitHub Bot commented on FLINK-4512:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2608#discussion_r82778557
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -282,29 +279,71 @@ public boolean isShutdown() {
        //  Handling checkpoints and messages
        // 
--------------------------------------------------------------------------------------------
     
    -   public Future<String> triggerSavepoint(long timestamp) throws Exception 
{
    -           CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
    +   /**
    +    * Triggers a savepoint with the default savepoint directory as a 
target.
    +    *
    +    * @param timestamp The timestamp for the savepoint.
    +    * @return A future to the completed checkpoint
    +    * @throws IllegalStateException If no default savepoint directory has 
been configured
    +    * @throws Exception Failures during triggering are forwarded
    +    */
    +   public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) 
throws Exception {
    +           return triggerSavepoint(timestamp, null);
    +   }
    +
    +   /**
    +    * Triggers a savepoint with the given savepoint directory as a target.
    +    *
    +    * @param timestamp The timestamp for the savepoint.
    +    * @param savepointDirectory Target directory for the savepoint.
    +    * @return A future to the completed checkpoint
    +    * @throws IllegalStateException If no savepoint directory has been
    +    *                               specified and no default savepoint 
directory has been
    +    *                               configured
    +    * @throws Exception             Failures during triggering are 
forwarded
    +    */
    +   public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
    +           String targetDirectory;
    +           if (savepointDirectory != null) {
    +                   targetDirectory = savepointDirectory;
    +           } else if (this.savepointDirectory != null) {
    +                   targetDirectory = this.savepointDirectory;
    +           } else {
    +                   throw new IllegalStateException("No savepoint directory 
configured. " +
    +                                   "You can either specify a directory 
when triggering this savepoint or " +
    +                                   "configure a cluster-wide default via 
key '" +
    +                                   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
    +           }
    +
    +           CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
    +           CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
     
                if (result.isSuccess()) {
    -                   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
    -                   return savepoint.getCompletionFuture();
    -           }
    -           else {
    -                   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
    +                   return 
result.getPendingCheckpoint().getCompletionFuture();
    +           } else {
    +                   CompletableFuture<CompletedCheckpoint> failed = new 
FlinkCompletableFuture<>();
    +                   failed.completeExceptionally(new Exception("Failed to 
trigger savepoint: " + result.getFailureReason().message()));
    --- End diff --
    
    Not adding the complete stack trace is on purpose, right? I'm wondering 
whether this could not help to debug problems later.


> Add option for persistent checkpoints
> -------------------------------------
>
>                 Key: FLINK-4512
>                 URL: https://issues.apache.org/jira/browse/FLINK-4512
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to