tillrohrmann commented on a change in pull request #17443:
URL: https://github.com/apache/flink/pull/17443#discussion_r741964620



##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the 
run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" 
>}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if 
isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint 
timeout will take effect if isn't set.
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private long savepointTimeout;

Review comment:
       Since we are not storing the unit explicitly:
   
   ```suggestion
       private long savepointTimeoutMs;
   ```

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path: 
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" 
>}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if 
isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint 
timeout will take effect if isn't set.
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of 
savepoint.");

Review comment:
       ```suggestion
               new Option("st", "savepointTimeout", true, "The maximum 
completion time a savepoint is allowed to take before it is failed.");
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path: 
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" 
>}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if 
isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint 
timeout will take effect if isn't set.
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String 
savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The 
savepoint will be written to
+     * the given savepoint directory, or {@link
+     * 
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it 
is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return path future where the savepoint is located
+     */
+    CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, long 
savepointTimeout);

Review comment:
       ```suggestion
               JobID jobId, @Nullable String savepointDirectory, long 
savepointTimeoutMs);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as 
a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just 
suspend
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been 
specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, 
long savepointTimeout) {

Review comment:
       ```suggestion
               final boolean terminate, @Nullable final String targetLocation, 
long savepointTimeoutMs) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
+        return triggerCheckpoint(props, externalSavepointLocation, 0, 
isPeriodic);
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation,
+            long savepointTimeout,

Review comment:
       ```suggestion
               long savepointTimeoutMs,
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);

Review comment:
       I am wondering whether it would be clearer if we passed 
`checkpointTimeout` instead of `0` here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException 
getCheckpointException(
                 @Nullable String externalSavepointLocation,
                 boolean isPeriodic) {
 
+            this(props, externalSavepointLocation, 0, isPeriodic);
+        }
+
+        CheckpointTriggerRequest(
+                CheckpointProperties props,
+                @Nullable String externalSavepointLocation,
+                long savepointTimeout,
+                boolean isPeriodic) {
+
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
+            this.savepointTimeout = savepointTimeout;

Review comment:
       Either `Duration` or we should put the unit in the name.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if 
the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Let's remove the old `triggerSavepoint` method.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;

Review comment:
       Same here: `FutureUtils.unsupportedOperationFuture()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been 
specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long 
savepointTimeout) {

Review comment:
       If we used for the `CheckpointCoordinator` a `Duration` type, then we 
wouldn't have to rely that users of this class provide the timeout in the 
correct unit.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if 
the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Alternatively we could rename the old method to 
`triggerSavepointWithDefaultTimeout` or so.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
     public String getJarFilePath() {
         return jarFile;
     }
+
+    public long getSavepointTimeout() {

Review comment:
       ```suggestion
       public long getSavepointTimeoutMs() {
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in 
this client's
+     * configuration. Stopping works only for streaming programs. Be aware, 
that the program might
+     * continue to run for a while after sending the stop command, because 
after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should 
inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return a {@link CompletableFuture} containing the path where the 
savepoint is located
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final long savepointTimeout);

Review comment:
       ```suggestion
               final long savepointTimeoutMs);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been 
specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long 
savepointTimeout) {

Review comment:
       ```suggestion
               @Nullable final String targetLocation, final long 
savepointTimeoutMs) {
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the 
run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" 
>}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if 
isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint 
timeout will take effect if isn't set.
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in 
this client's
+     * configuration. Stopping works only for streaming programs. Be aware, 
that the program might
+     * continue to run for a while after sending the stop command, because 
after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should 
inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, 
long initializationTi
                 jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, 
terminate, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId,
+                gateway ->
+                        gateway.stopWithSavepoint(
+                                targetDirectory, savepointTimeout, terminate, 
timeout));

Review comment:
       Same here for `stopWithSavepoint`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory Target directory for the savepoint.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() 
external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory to which to write the savepoint data or null if 
the default savepoint
+     *     directory should be used
+     * @param terminate flag indicating if the job should terminate or just 
suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    default CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }

Review comment:
       Can we get rid of this method?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }

Review comment:
       Can we get rid of this method?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory) {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, long 
savepointTimeout) {
+            return null;

Review comment:
       Let's return `FutureUtils.unsupportedOperationFuture()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, 
boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,

Review comment:
       Unit is missing.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
         return targetDirectory;
     }
 
+    long getSavepointTimeout() {

Review comment:
       ```suggestion
       long getSavepointTimeoutMs() {
   ```

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
     /** Optional target directory for the savepoint. Overwrites cluster 
default. */
     private final String targetDirectory;
 
+    private final long savepointTimeout;

Review comment:
       ```suggestion
       private final long savepointTimeoutMs;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as 
a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just 
suspend
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String 
savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The 
savepoint will be written to
+     * the given savepoint directory, or {@link
+     * 
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it 
is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as 
a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just 
suspend
+     * @param targetLocation Target location for the savepoint, optional. If 
null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been 
specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, 
long savepointTimeout) {
+
+        final CheckpointProperties properties =
+                
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+        return triggerSavepointInternal(properties, targetLocation, 
savepointTimeout);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            @Nullable final String targetLocation) {
+            @Nullable final String targetLocation,
+            long savepointTimeout) {

Review comment:
       ```suggestion
               long savepointTimeoutMs) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, 
long initializationTi
                 jobId, gateway -> gateway.triggerSavepoint(targetDirectory, 
cancelJob, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {

Review comment:
       Can we replace the old `triggerSavepoint` method completely?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
             final boolean terminate,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if 
the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, 
checkpoint timeout will take
+     *     effect.
+     * @param terminate flag indicating if the job should terminate or just 
suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            @RpcTimeout final Time timeout);

Review comment:
       And `stopWithSavepoint`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean 
isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory, final boolean terminate, 
final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+        return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+
+        return schedulerNG.stopWithSavepoint(targetDirectory, 
savepointTimeout, terminate);

Review comment:
       Same here.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of 
savepoint.");

Review comment:
       What is the unit of the savepoint timeout? ms? Then I would suggest to 
make this explicit via `savepointTimeoutMs` and also mentioning the timeout in 
the description.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean 
isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory, final boolean cancelJob, 
final Time timeout) {
 
-        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+        return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {
+        return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout, 
cancelJob);

Review comment:
       Can we replace the old `triggerSavepoint` method here? This should not 
be public API.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, 
boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate) {

Review comment:
       If we have overloaded methods then I like to keep the parameter prefix 
always the same between the different variants. I think this is easier for 
people to use. So in this case, I would put `savepointTimeout` at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to