This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d66e82915e [FLINK-27101][checkpointing][rest] Add restful API to 
trigger checkpoints
5d66e82915e is described below

commit 5d66e82915eace9342c175163b17f610bfbf7fa4
Author: Jiale <jiale....@vungle.com>
AuthorDate: Thu Oct 13 11:54:29 2022 -0700

    [FLINK-27101][checkpointing][rest] Add restful API to trigger checkpoints
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 141 +++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  70 +++++
 .../flink/core/execution/CheckpointType.java       |  48 +++
 .../src/test/resources/rest_api_v1.snapshot        |  73 +++++
 .../runtime/checkpoint/CheckpointCoordinator.java  |  46 ++-
 .../flink/runtime/checkpoint/CheckpointType.java   |   3 +
 .../flink/runtime/dispatcher/Dispatcher.java       |  25 ++
 .../DispatcherCachedOperationsHandler.java         |  35 +++
 .../dispatcher/DispatcherOperationCaches.java      |  13 +-
 .../dispatcher/TriggerCheckpointFunction.java      |  34 ++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  13 +-
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  17 +-
 .../job/checkpoints/CheckpointHandlers.java        | 257 +++++++++++++++
 .../CheckpointInfo.java}                           |  31 +-
 .../checkpoints/CheckpointStatusHeaders.java       |  81 +++++
 .../CheckpointStatusMessageParameters.java         |  48 +++
 .../checkpoints/CheckpointTriggerHeaders.java      |  77 +++++
 .../CheckpointTriggerMessageParameters.java        |  43 +++
 .../checkpoints/CheckpointTriggerRequestBody.java  |  63 ++++
 .../messages/job/savepoints/SavepointInfo.java     |   3 +
 .../flink/runtime/scheduler/SchedulerBase.java     |   6 +-
 .../flink/runtime/scheduler/SchedulerNG.java       |   4 +-
 .../scheduler/adaptive/AdaptiveScheduler.java      |   7 +-
 .../adaptive/StateWithExecutionGraph.java          |   6 +-
 .../flink/runtime/webmonitor/RestfulGateway.java   |  30 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 +
 .../CheckpointCoordinatorTriggeringTest.java       | 168 +++++++++-
 .../DispatcherCachedOperationsHandlerTest.java     |  73 ++++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |  15 +-
 .../utils/TestingJobMasterGatewayBuilder.java      |   9 +-
 .../job/checkpoints/CheckpointHandlersTest.java    | 345 +++++++++++++++++++++
 .../runtime/scheduler/TestingSchedulerNG.java      |  20 +-
 .../webmonitor/TestingDispatcherGateway.java       |  10 +
 .../runtime/webmonitor/TestingRestfulGateway.java  |  65 ++++
 34 files changed, 1841 insertions(+), 51 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index f9cfafd5954..93122afd75c 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1776,6 +1776,68 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/checkpoints</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>POST</code></td>
+      <td class="text-left">Response code: <code>202 Accepted</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Triggers a checkpoint. This async operation would return 
a 'triggerid' for further query identifier.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointTriggerRequestBody",
+  "properties" : {
+    "checkpointType" : {
+      "type" : "string",
+      "enum" : [ "CONFIGURED", "FULL", "INCREMENTAL" ]
+    },
+    "triggerId" : {
+      "type" : "any"
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+  "properties" : {
+    "request-id" : {
+      "type" : "any"
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
@@ -2194,6 +2256,77 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/checkpoints/:triggerid</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the status of a checkpoint trigger 
operation.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+<li><code>triggerid</code> - 32-character hexadecimal string that identifies 
an asynchronous operation trigger ID. The ID was returned then the operation 
was triggered.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
+  "properties" : {
+    "operation" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointInfo",
+      "properties" : {
+        "checkpointId" : {
+          "type" : "integer"
+        },
+        "failureCause" : {
+          "type" : "any"
+        }
+      }
+    },
+    "status" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
+      "properties" : {
+        "id" : {
+          "type" : "string",
+          "required" : true,
+          "enum" : [ "IN_PROGRESS", "COMPLETED" ]
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
@@ -3467,6 +3600,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
       "type" : "string",
       "enum" : [ "ok", "low", "high" ]
     },
+    "backpressureLevel" : {
+      "type" : "string",
+      "enum" : [ "ok", "low", "high" ]
+    },
     "end-timestamp" : {
       "type" : "integer"
     },
@@ -3487,6 +3624,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
             "type" : "string",
             "enum" : [ "ok", "low", "high" ]
           },
+          "backpressureLevel" : {
+            "type" : "string",
+            "enum" : [ "ok", "low", "high" ]
+          },
           "busyRatio" : {
             "type" : "number"
           },
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 60b22473cb3..4d7a520d316 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -495,6 +495,29 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/CheckpointingStatistics'
+    post:
+      description: Triggers a checkpoint. This async operation would return a 
'triggerid'
+        for further query identifier.
+      operationId: triggerCheckpoint
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CheckpointTriggerRequestBody'
+      responses:
+        "202":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TriggerResponse'
   /jobs/{jobid}/checkpoints/config:
     get:
       description: Returns the checkpointing configuration.
@@ -569,6 +592,31 @@ paths:
             application/json:
               schema:
                 $ref: 
'#/components/schemas/TaskCheckpointStatisticsWithSubtaskDetails'
+  /jobs/{jobid}/checkpoints/{triggerid}:
+    get:
+      description: Returns the status of a checkpoint trigger operation.
+      operationId: getCheckpointStatus
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      - name: triggerid
+        in: path
+        description: 32-character hexadecimal string that identifies an 
asynchronous
+          operation trigger ID. The ID was returned then the operation was 
triggered.
+        required: true
+        schema:
+          $ref: '#/components/schemas/TriggerId'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AsynchronousOperationResult'
   /jobs/{jobid}/config:
     get:
       description: Returns the configuration of a job.
@@ -1559,6 +1607,7 @@ components:
         operation:
           oneOf:
           - $ref: '#/components/schemas/AsynchronousOperationInfo'
+          - $ref: '#/components/schemas/CheckpointInfo'
           - $ref: '#/components/schemas/SavepointInfo'
         status:
           $ref: '#/components/schemas/QueueStatus'
@@ -1626,6 +1675,14 @@ components:
         sync:
           type: integer
           format: int64
+    CheckpointInfo:
+      type: object
+      properties:
+        checkpointId:
+          type: integer
+          format: int64
+        failureCause:
+          $ref: '#/components/schemas/SerializedThrowable'
     CheckpointStatistics:
       required:
       - className
@@ -1684,6 +1741,19 @@ components:
       - IN_PROGRESS
       - COMPLETED
       - FAILED
+    CheckpointTriggerRequestBody:
+      type: object
+      properties:
+        checkpointType:
+          $ref: '#/components/schemas/CheckpointType'
+        triggerId:
+          $ref: '#/components/schemas/TriggerId'
+    CheckpointType:
+      type: string
+      enum:
+      - CONFIGURED
+      - FULL
+      - INCREMENTAL
     CheckpointingStatistics:
       type: object
       properties:
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java
new file mode 100644
index 00000000000..521846b174e
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Describes the type in which a checkpoint should be taken. */
+@PublicEvolving
+public enum CheckpointType implements DescribedEnum {
+    CONFIGURED("The checkpoint type derived from the job config"),
+
+    FULL("A checkpoint type that checkpoints the entire state, common for all 
state backends."),
+
+    INCREMENTAL(
+            "A checkpoint type that checkpoints only the difference between 
snapshots, specific for certain state backend.");
+
+    private final InlineElement description;
+    public static final CheckpointType DEFAULT = CheckpointType.CONFIGURED;
+
+    CheckpointType(String description) {
+        this.description = text(description);
+    }
+
+    @Override
+    public InlineElement getDescription() {
+        return description;
+    }
+}
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index b13156998af..b952536a663 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1343,6 +1343,41 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/checkpoints",
+    "method" : "POST",
+    "status-code" : "202 Accepted",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointTriggerRequestBody",
+      "properties" : {
+        "checkpointType" : {
+          "type" : "string",
+          "enum" : [ "CONFIGURED", "FULL", "INCREMENTAL" ]
+        },
+        "triggerId" : {
+          "type" : "any"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse",
+      "properties" : {
+        "request-id" : {
+          "type" : "any"
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/checkpoints/config",
     "method" : "GET",
@@ -1689,6 +1724,44 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/checkpoints/:triggerid",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      }, {
+        "key" : "triggerid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult",
+      "properties" : {
+        "status" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus",
+          "properties" : {
+            "id" : {
+              "type" : "string",
+              "required" : true,
+              "enum" : [ "IN_PROGRESS", "COMPLETED" ]
+            }
+          }
+        },
+        "operation" : {
+          "type" : "any"
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/config",
     "method" : "GET",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0f5033ac8ed..49cc6449b3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import 
org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
@@ -85,6 +86,8 @@ import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toMap;
+import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointType.FULL_CHECKPOINT;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -511,6 +514,47 @@ public class CheckpointCoordinator {
         return triggerCheckpointFromCheckpointThread(checkpointProperties, 
null, isPeriodic);
     }
 
+    /**
+     * Triggers one new checkpoint with the given checkpointType. The returned 
future completes when
+     * the triggered checkpoint finishes or an error occurred.
+     *
+     * @param checkpointType specifies the back up type of the checkpoint to 
trigger.
+     * @return a future to the completed checkpoint.
+     */
+    public CompletableFuture<CompletedCheckpoint> 
triggerCheckpoint(CheckpointType checkpointType) {
+
+        if (checkpointType == null) {
+            throw new IllegalArgumentException("checkpointType cannot be 
null");
+        }
+
+        final SnapshotType snapshotType;
+        switch (checkpointType) {
+            case CONFIGURED:
+                snapshotType = checkpointProperties.getCheckpointType();
+                break;
+            case FULL:
+                snapshotType = FULL_CHECKPOINT;
+                break;
+            case INCREMENTAL:
+                snapshotType = CHECKPOINT;
+                break;
+            default:
+                throw new IllegalArgumentException("unknown checkpointType: " 
+ checkpointType);
+        }
+
+        final CheckpointProperties properties =
+                new CheckpointProperties(
+                        checkpointProperties.forceCheckpoint(),
+                        snapshotType,
+                        checkpointProperties.discardOnSubsumed(),
+                        checkpointProperties.discardOnJobFinished(),
+                        checkpointProperties.discardOnJobCancelled(),
+                        checkpointProperties.discardOnJobFailed(),
+                        checkpointProperties.discardOnJobSuspended(),
+                        checkpointProperties.isUnclaimed());
+        return triggerCheckpointFromCheckpointThread(properties, null, false);
+    }
+
     @VisibleForTesting
     CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
             CheckpointProperties props,
@@ -734,7 +778,7 @@ public class CheckpointCoordinator {
 
         final SnapshotType type;
         if (this.forceFullSnapshot && !request.props.isSavepoint()) {
-            type = CheckpointType.FULL_CHECKPOINT;
+            type = FULL_CHECKPOINT;
         } else {
             type = request.props.getCheckpointType();
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
index 6ac305dc0d5..08f3927852d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.Internal;
+
 import java.util.Objects;
 
 /** The type of checkpoint to perform. */
+@Internal
 public final class CheckpointType implements SnapshotType {
 
     /** A checkpoint, full or incremental. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index ca72be6f4d5..89c6d898e75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -29,10 +29,12 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -271,6 +273,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         this.dispatcherCachedOperationsHandler =
                 new DispatcherCachedOperationsHandler(
                         dispatcherServices.getOperationCaches(),
+                        this::triggerCheckpointAndGetCheckpointID,
                         this::triggerSavepointAndGetLocation,
                         this::stopWithSavepointAndGetLocation);
 
@@ -902,6 +905,28 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 jobID, gateway -> gateway.triggerCheckpoint(timeout));
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> triggerCheckpoint(
+            AsynchronousJobOperationKey operationKey, CheckpointType 
checkpointType, Time timeout) {
+        return dispatcherCachedOperationsHandler.triggerCheckpoint(
+                operationKey, checkpointType, timeout);
+    }
+
+    @Override
+    public CompletableFuture<OperationResult<Long>> 
getTriggeredCheckpointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return 
dispatcherCachedOperationsHandler.getCheckpointStatus(operationKey);
+    }
+
+    private CompletableFuture<Long> triggerCheckpointAndGetCheckpointID(
+            final JobID jobID, final CheckpointType checkpointType, final Time 
timeout) {
+        return performOperationOnJobMasterGateway(
+                jobID,
+                gateway ->
+                        gateway.triggerCheckpoint(checkpointType, timeout)
+                                
.thenApply(CompletedCheckpoint::getCheckpointID));
+    }
+
     @Override
     public CompletableFuture<Acknowledge> triggerSavepoint(
             final AsynchronousJobOperationKey operationKey,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
index 90daa799cd1..046e51ec404 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
@@ -39,15 +40,22 @@ public class DispatcherCachedOperationsHandler {
     private final CompletedOperationCache<AsynchronousJobOperationKey, String>
             savepointTriggerCache;
 
+    private final CompletedOperationCache<AsynchronousJobOperationKey, Long> 
checkpointTriggerCache;
+
+    private final TriggerCheckpointFunction triggerCheckpointFunction;
+
     private final TriggerSavepointFunction triggerSavepointFunction;
 
     private final TriggerSavepointFunction stopWithSavepointFunction;
 
     DispatcherCachedOperationsHandler(
             DispatcherOperationCaches operationCaches,
+            TriggerCheckpointFunction triggerCheckpointFunction,
             TriggerSavepointFunction triggerSavepointFunction,
             TriggerSavepointFunction stopWithSavepointFunction) {
         this(
+                triggerCheckpointFunction,
+                operationCaches.getCheckpointTriggerCache(),
                 triggerSavepointFunction,
                 stopWithSavepointFunction,
                 operationCaches.getSavepointTriggerCache());
@@ -55,14 +63,41 @@ public class DispatcherCachedOperationsHandler {
 
     @VisibleForTesting
     DispatcherCachedOperationsHandler(
+            TriggerCheckpointFunction triggerCheckpointFunction,
+            CompletedOperationCache<AsynchronousJobOperationKey, Long> 
checkpointTriggerCache,
             TriggerSavepointFunction triggerSavepointFunction,
             TriggerSavepointFunction stopWithSavepointFunction,
             CompletedOperationCache<AsynchronousJobOperationKey, String> 
savepointTriggerCache) {
+        this.triggerCheckpointFunction = triggerCheckpointFunction;
+        this.checkpointTriggerCache = checkpointTriggerCache;
         this.triggerSavepointFunction = triggerSavepointFunction;
         this.stopWithSavepointFunction = stopWithSavepointFunction;
         this.savepointTriggerCache = savepointTriggerCache;
     }
 
+    public CompletableFuture<Acknowledge> triggerCheckpoint(
+            AsynchronousJobOperationKey operationKey, CheckpointType 
checkpointType, Time timeout) {
+
+        if (!checkpointTriggerCache.containsOperation(operationKey)) {
+            checkpointTriggerCache.registerOngoingOperation(
+                    operationKey,
+                    triggerCheckpointFunction.apply(
+                            operationKey.getJobId(), checkpointType, timeout));
+        }
+
+        return CompletableFuture.completedFuture(Acknowledge.get());
+    }
+
+    public CompletableFuture<OperationResult<Long>> getCheckpointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return checkpointTriggerCache
+                .get(operationKey)
+                .map(CompletableFuture::completedFuture)
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new 
UnknownOperationKeyException(operationKey)));
+    }
+
     public CompletableFuture<Acknowledge> triggerSavepoint(
             AsynchronousJobOperationKey operationKey,
             String targetDirectory,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java
index 36331796bb8..85851851b29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java
@@ -22,8 +22,10 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
 import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -33,6 +35,8 @@ public class DispatcherOperationCaches implements 
AutoCloseableAsync {
     private final CompletedOperationCache<AsynchronousJobOperationKey, String>
             savepointTriggerCache;
 
+    private final CompletedOperationCache<AsynchronousJobOperationKey, Long> 
checkpointTriggerCache;
+
     @VisibleForTesting
     public DispatcherOperationCaches() {
         this(RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
@@ -41,14 +45,21 @@ public class DispatcherOperationCaches implements 
AutoCloseableAsync {
     @VisibleForTesting
     public DispatcherOperationCaches(Duration cacheDuration) {
         savepointTriggerCache = new CompletedOperationCache<>(cacheDuration);
+        checkpointTriggerCache = new CompletedOperationCache<>(cacheDuration);
     }
 
     public CompletedOperationCache<AsynchronousJobOperationKey, String> 
getSavepointTriggerCache() {
         return savepointTriggerCache;
     }
 
+    public CompletedOperationCache<AsynchronousJobOperationKey, Long> 
getCheckpointTriggerCache() {
+        return checkpointTriggerCache;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
-        return savepointTriggerCache.closeAsync();
+        return FutureUtils.completeAll(
+                Arrays.asList(
+                        savepointTriggerCache.closeAsync(), 
checkpointTriggerCache.closeAsync()));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java
new file mode 100644
index 00000000000..a874972f91c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.execution.CheckpointType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Wrapper interface for functions triggering checkpoints. Currently only 
serves to shorten
+ * signatures.
+ */
+@FunctionalInterface
+public interface TriggerCheckpointFunction {
+    CompletableFuture<Long> apply(JobID jobId, CheckpointType checkpointType, 
Time timeout);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3dfaa1c8776..27c81b30acb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.blocklist.BlocklistContext;
 import org.apache.flink.runtime.blocklist.BlocklistHandler;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -853,6 +855,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         return CompletableFuture.completedFuture(schedulerNG.requestJob());
     }
 
+    @Override
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            final CheckpointType checkpointType, final Time timeout) {
+        return schedulerNG.triggerCheckpoint(checkpointType);
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory,
@@ -863,11 +871,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         return schedulerNG.triggerSavepoint(targetDirectory, cancelJob, 
formatType);
     }
 
-    @Override
-    public CompletableFuture<String> triggerCheckpoint(Time timeout) {
-        return schedulerNG.triggerCheckpoint();
-    }
-
     @Override
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4cbd4682d66..c99fda1546e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -22,9 +22,11 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.blocklist.BlocklistListener;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -218,13 +220,26 @@ public interface JobMasterGateway
             final SavepointFormatType formatType,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a checkpoint of the executed job.
+     *
+     * @param checkpointType to determine how checkpoint should be taken
+     * @param timeout for the rpc call
+     * @return Future which is completed with the CompletedCheckpoint once 
completed
+     */
+    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            final CheckpointType checkpointType, @RpcTimeout final Time 
timeout);
+
     /**
      * Triggers taking a checkpoint of the executed job.
      *
      * @param timeout for the rpc call
      * @return Future which is completed with the checkpoint path once 
completed
      */
-    CompletableFuture<String> triggerCheckpoint(@RpcTimeout final Time 
timeout);
+    default CompletableFuture<String> triggerCheckpoint(@RpcTimeout final Time 
timeout) {
+        return triggerCheckpoint(CheckpointType.DEFAULT, timeout)
+                .thenApply(CompletedCheckpoint::getExternalPointer);
+    }
 
     /**
      * Stops the job with a savepoint.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java
new file mode 100644
index 00000000000..871a2dfdabf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * HTTP handlers for asynchronous triggering of checkpoints.
+ *
+ * <p>Drawing checkpoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, checkpoints must be drawn in two steps. First, an HTTP request 
is issued to trigger
+ * the checkpoint asynchronously. The request will be assigned a {@link 
TriggerId}, which is
+ * returned in the response body. Next, the returned {@link TriggerId} should 
be used to poll the
+ * status of the checkpoint until it is finished.
+ *
+ * <p>A checkpoint is triggered by sending an HTTP {@code POST} request to 
{@code
+ * /jobs/:jobid/checkpoints}. The HTTP request may contain a JSON body to 
specify a customized
+ * {@link TriggerId} and a {@link CheckpointType}, e.g.,
+ *
+ * <pre>
+ * { "triggerId": "7d273f5a62eb4730b9dea8e833733c1e", "checkpointType": "FULL" 
}
+ * </pre>
+ *
+ * <p>If the body is omitted, or the field {@code checkpointType} is {@code 
null}, the default
+ * checkpointType of {@link CheckpointType#CONFIGURED} will be used. As 
written above, the response
+ * will contain a request id, e.g.,
+ *
+ * <pre>
+ * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
+ * </pre>
+ *
+ * <p>To poll for the status of an ongoing checkpoint, an HTTP {@code GET} 
request is issued to
+ * {@code /jobs/:jobid/checkpoints/:checkpointtriggerid}. If the specified 
checkpoint is still
+ * ongoing, the response will be
+ *
+ * <pre>
+ * {
+ *     "status": {
+ *         "id": "IN_PROGRESS"
+ *     }
+ * }
+ * </pre>
+ *
+ * <p>If the specified checkpoints has completed, the status id will 
transition to {@code
+ * COMPLETED}, and the response will additionally contain information about 
the savepoint, such as
+ * the location:
+ *
+ * <pre>
+ * {
+ *     "status": {
+ *         "id": "COMPLETED"
+ *     },
+ *     "operation": {
+ *         "checkpointId": "123"
+ *     }
+ * }
+ * </pre>
+ */
+public class CheckpointHandlers {
+
+    /** Handler for the checkpoint trigger operation. */
+    public static class CheckpointTriggerHandler
+            extends AbstractRestHandler<
+                    RestfulGateway,
+                    CheckpointTriggerRequestBody,
+                    TriggerResponse,
+                    CheckpointTriggerMessageParameters> {
+
+        public CheckpointTriggerHandler(
+                final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                final Time timeout,
+                final Map<String, String> responseHeaders) {
+            super(
+                    leaderRetriever,
+                    timeout,
+                    responseHeaders,
+                    CheckpointTriggerHeaders.getInstance());
+        }
+
+        private static AsynchronousJobOperationKey createOperationKey(
+                final HandlerRequest<CheckpointTriggerRequestBody> request) {
+            final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+            return AsynchronousJobOperationKey.of(
+                    
request.getRequestBody().getTriggerId().orElseGet(TriggerId::new), jobId);
+        }
+
+        @Override
+        protected CompletableFuture<TriggerResponse> handleRequest(
+                @Nonnull HandlerRequest<CheckpointTriggerRequestBody> request,
+                @Nonnull RestfulGateway gateway)
+                throws RestHandlerException {
+            final AsynchronousJobOperationKey operationKey = 
createOperationKey(request);
+
+            return gateway.triggerCheckpoint(
+                            operationKey,
+                            request.getRequestBody().getCheckpointType(),
+                            RpcUtils.INF_TIMEOUT)
+                    .handle(
+                            (acknowledge, throwable) -> {
+                                if (throwable == null) {
+                                    return new 
TriggerResponse(operationKey.getTriggerId());
+                                } else {
+                                    throw new CompletionException(
+                                            createInternalServerError(
+                                                    throwable, operationKey, 
"triggering"));
+                                }
+                            });
+        }
+    }
+
+    /** HTTP handler to query for the status of the checkpoint. */
+    public static class CheckpointStatusHandler
+            extends AbstractRestHandler<
+                    RestfulGateway,
+                    EmptyRequestBody,
+                    AsynchronousOperationResult<CheckpointInfo>,
+                    CheckpointStatusMessageParameters> {
+
+        public CheckpointStatusHandler(
+                final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                final Time timeout,
+                final Map<String, String> responseHeaders) {
+            super(leaderRetriever, timeout, responseHeaders, 
CheckpointStatusHeaders.getInstance());
+        }
+
+        @Override
+        public CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> 
handleRequest(
+                @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway)
+                throws RestHandlerException {
+
+            final AsynchronousJobOperationKey key = getOperationKey(request);
+
+            return gateway.getTriggeredCheckpointStatus(key)
+                    .handle(
+                            (operationResult, throwable) -> {
+                                if (throwable == null) {
+                                    switch (operationResult.getStatus()) {
+                                        case SUCCESS:
+                                            return 
AsynchronousOperationResult.completed(
+                                                    operationResultResponse(
+                                                            
operationResult.getResult()));
+                                        case FAILURE:
+                                            return 
AsynchronousOperationResult.completed(
+                                                    
exceptionalOperationResultResponse(
+                                                            
operationResult.getThrowable()));
+                                        case IN_PROGRESS:
+                                            return 
AsynchronousOperationResult.inProgress();
+                                        default:
+                                            throw new IllegalStateException(
+                                                    "No handler for operation 
status "
+                                                            + 
operationResult.getStatus()
+                                                            + ", encountered 
for key "
+                                                            + key);
+                                    }
+                                } else {
+                                    throw new CompletionException(
+                                            
maybeCreateNotFoundError(throwable, key)
+                                                    .orElseGet(
+                                                            () ->
+                                                                    
createInternalServerError(
+                                                                            
throwable,
+                                                                            
key,
+                                                                            
"retrieving status of")));
+                                }
+                            });
+        }
+
+        private static Optional<RestHandlerException> maybeCreateNotFoundError(
+                Throwable throwable, AsynchronousJobOperationKey key) {
+            if (ExceptionUtils.findThrowable(throwable, 
UnknownOperationKeyException.class)
+                    .isPresent()) {
+                return Optional.of(
+                        new RestHandlerException(
+                                String.format(
+                                        "There is no checkpoint operation with 
triggerId=%s for job %s.",
+                                        key.getTriggerId(), key.getJobId()),
+                                HttpResponseStatus.NOT_FOUND));
+            }
+            return Optional.empty();
+        }
+
+        private static AsynchronousJobOperationKey getOperationKey(
+                HandlerRequest<EmptyRequestBody> request) {
+            final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+            final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
+            return AsynchronousJobOperationKey.of(triggerId, jobId);
+        }
+
+        private static CheckpointInfo exceptionalOperationResultResponse(
+                final Throwable throwable) {
+            return new CheckpointInfo(null, new 
SerializedThrowable(throwable));
+        }
+
+        private static CheckpointInfo operationResultResponse(final Long 
checkpointId) {
+            return new CheckpointInfo(checkpointId, null);
+        }
+    }
+
+    private static RestHandlerException createInternalServerError(
+            Throwable throwable, AsynchronousJobOperationKey key, String 
errorMessageInfix) {
+        return new RestHandlerException(
+                String.format(
+                        "Internal server error while %s checkpoint operation 
with triggerId=%s for job %s.",
+                        errorMessageInfix, key.getTriggerId(), key.getJobId()),
+                HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                throwable);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java
similarity index 75%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java
index 33cd1530e48..62639b839fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest.messages.job.savepoints;
+package org.apache.flink.runtime.rest.messages.checkpoints;
 
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer;
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer
 import org.apache.flink.util.SerializedThrowable;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@@ -33,17 +34,17 @@ import javax.annotation.Nullable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/** Represents information about a finished savepoint. */
+/** Represents information about a triggered checkpoint. */
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public class SavepointInfo implements ResponseBody {
+public class CheckpointInfo implements ResponseBody {
 
-    private static final String FIELD_NAME_LOCATION = "location";
+    private static final String FIELD_NAME_CHECKPOINT_ID = "checkpointId";
 
-    private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+    private static final String FIELD_NAME_FAILURE_CAUSE = "failureCause";
 
-    @JsonProperty(FIELD_NAME_LOCATION)
+    @JsonProperty(FIELD_NAME_CHECKPOINT_ID)
     @Nullable
-    private final String location;
+    private final Long checkpointId;
 
     @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
     @JsonSerialize(using = SerializedThrowableSerializer.class)
@@ -52,26 +53,28 @@ public class SavepointInfo implements ResponseBody {
     private final SerializedThrowable failureCause;
 
     @JsonCreator
-    public SavepointInfo(
-            @JsonProperty(FIELD_NAME_LOCATION) @Nullable final String location,
+    public CheckpointInfo(
+            @JsonProperty(FIELD_NAME_CHECKPOINT_ID) @Nullable final Long 
checkpointId,
             @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
                     @JsonDeserialize(using = 
SerializedThrowableDeserializer.class)
                     @Nullable
                     final SerializedThrowable failureCause) {
         checkArgument(
-                location != null ^ failureCause != null,
-                "Either location or failureCause must be set");
+                checkpointId != null ^ failureCause != null,
+                "Either checkpointId or failureCause must be set");
 
-        this.location = location;
+        this.checkpointId = checkpointId;
         this.failureCause = failureCause;
     }
 
     @Nullable
-    public String getLocation() {
-        return location;
+    @JsonIgnore
+    public Long getCheckpointId() {
+        return checkpointId;
     }
 
     @Nullable
+    @JsonIgnore
     public SerializedThrowable getFailureCause() {
         return failureCause;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java
new file mode 100644
index 00000000000..ef6aa5994b2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** These headers define the protocol for querying the status of a checkpoint 
operation. */
+public class CheckpointStatusHeaders
+        extends AsynchronousOperationStatusMessageHeaders<
+                CheckpointInfo, CheckpointStatusMessageParameters> {
+
+    private static final CheckpointStatusHeaders INSTANCE = new 
CheckpointStatusHeaders();
+
+    private static final String URL =
+            String.format(
+                    "/jobs/:%s/checkpoints/:%s",
+                    JobIDPathParameter.KEY, TriggerIdPathParameter.KEY);
+
+    private CheckpointStatusHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public CheckpointStatusMessageParameters getUnresolvedMessageParameters() {
+        return new CheckpointStatusMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Class<CheckpointInfo> getValueClass() {
+        return CheckpointInfo.class;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the status of a checkpoint trigger operation.";
+    }
+
+    public static CheckpointStatusHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java
new file mode 100644
index 00000000000..d0b0d8b7816
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/** The parameters for triggering a checkpoint. */
+public class CheckpointStatusMessageParameters extends MessageParameters {
+
+    public final JobIDPathParameter jobIdPathParameter = new 
JobIDPathParameter();
+
+    public final TriggerIdPathParameter triggerIdPathParameter = new 
TriggerIdPathParameter();
+
+    @Override
+    public Collection<MessagePathParameter<?>> getPathParameters() {
+        return Collections.unmodifiableCollection(
+                Arrays.asList(jobIdPathParameter, triggerIdPathParameter));
+    }
+
+    @Override
+    public Collection<MessageQueryParameter<?>> getQueryParameters() {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java
new file mode 100644
index 00000000000..6592e67f06d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** These headers define the protocol for triggering a checkpoint. */
+public class CheckpointTriggerHeaders
+        extends AsynchronousOperationTriggerMessageHeaders<
+                CheckpointTriggerRequestBody, 
CheckpointTriggerMessageParameters> {
+
+    private static final CheckpointTriggerHeaders INSTANCE = new 
CheckpointTriggerHeaders();
+
+    private static final String URL =
+            String.format("/jobs/:%s/checkpoints", JobIDPathParameter.KEY);
+
+    private CheckpointTriggerHeaders() {}
+
+    @Override
+    public Class<CheckpointTriggerRequestBody> getRequestClass() {
+        return CheckpointTriggerRequestBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.ACCEPTED;
+    }
+
+    @Override
+    public CheckpointTriggerMessageParameters getUnresolvedMessageParameters() 
{
+        return new CheckpointTriggerMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.POST;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    protected String getAsyncOperationDescription() {
+        return "Triggers a checkpoint.";
+    }
+
+    @Override
+    public String operationId() {
+        return "triggerCheckpoint";
+    }
+
+    public static CheckpointTriggerHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java
new file mode 100644
index 00000000000..293eb59af4d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** The parameters for triggering a checkpoint. */
+public class CheckpointTriggerMessageParameters extends MessageParameters {
+
+    public final JobIDPathParameter jobID = new JobIDPathParameter();
+
+    @Override
+    public Collection<MessagePathParameter<?>> getPathParameters() {
+        return Collections.singleton(jobID);
+    }
+
+    @Override
+    public Collection<MessageQueryParameter<?>> getQueryParameters() {
+        return Collections.emptyList();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java
new file mode 100644
index 00000000000..1871ef56e3d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** {@link RequestBody} to trigger checkpoints. */
+public class CheckpointTriggerRequestBody implements RequestBody {
+
+    private static final String FIELD_NAME_TRIGGER_ID = "triggerId";
+    private static final String FIELD_NAME_CHECKPOINT_TYPE = "checkpointType";
+
+    @JsonProperty(FIELD_NAME_TRIGGER_ID)
+    @Nullable
+    private final TriggerId triggerId;
+
+    @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+    private final CheckpointType checkpointType;
+
+    @JsonCreator
+    public CheckpointTriggerRequestBody(
+            @Nullable @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) final 
CheckpointType checkpointType,
+            @Nullable @JsonProperty(FIELD_NAME_TRIGGER_ID) TriggerId 
triggerId) {
+        this.triggerId = triggerId;
+        this.checkpointType = checkpointType != null ? checkpointType : 
CheckpointType.DEFAULT;
+    }
+
+    @JsonIgnore
+    public Optional<TriggerId> getTriggerId() {
+        return Optional.ofNullable(triggerId);
+    }
+
+    @JsonIgnore
+    public CheckpointType getCheckpointType() {
+        return checkpointType;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
index 33cd1530e48..174405edbea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer
 import org.apache.flink.util.SerializedThrowable;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@@ -67,11 +68,13 @@ public class SavepointInfo implements ResponseBody {
     }
 
     @Nullable
+    @JsonIgnore
     public String getLocation() {
         return location;
     }
 
     @Nullable
+    @JsonIgnore
     public SerializedThrowable getFailureCause() {
         return failureCause;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index c822dc92c81..02c19f92391 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -878,7 +879,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
     }
 
     @Override
-    public CompletableFuture<String> triggerCheckpoint() {
+    public CompletableFuture<CompletedCheckpoint> 
triggerCheckpoint(CheckpointType checkpointType) {
         mainThreadExecutor.assertRunningInMainThread();
 
         final CheckpointCoordinator checkpointCoordinator =
@@ -890,8 +891,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
         log.info("Triggering a manual checkpoint for job {}.", jobID);
 
         return checkpointCoordinator
-                .triggerCheckpoint(false)
-                .thenApply(CompletedCheckpoint::getExternalPointer)
+                .triggerCheckpoint(checkpointType)
                 .handleAsync(
                         (path, throwable) -> {
                             if (throwable != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 68877e287cd..70b0c3b8807 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -125,7 +127,7 @@ public interface SchedulerNG extends GlobalFailureHandler, 
AutoCloseableAsync {
     CompletableFuture<String> triggerSavepoint(
             @Nullable String targetDirectory, boolean cancelJob, 
SavepointFormatType formatType);
 
-    CompletableFuture<String> triggerCheckpoint();
+    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType 
checkpointType);
 
     void acknowledgeCheckpoint(
             JobID jobID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index d76939bbb57..b8417a0f426 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -623,10 +625,11 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public CompletableFuture<String> triggerCheckpoint() {
+    public CompletableFuture<CompletedCheckpoint> 
triggerCheckpoint(CheckpointType checkpointType) {
         return state.tryCall(
                         StateWithExecutionGraph.class,
-                        StateWithExecutionGraph::triggerCheckpoint,
+                        stateWithExecutionGraph ->
+                                
stateWithExecutionGraph.triggerCheckpoint(checkpointType),
                         "triggerCheckpoint")
                 .orElse(
                         FutureUtils.completedExceptionally(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 8ba39311714..953f36a2573 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -275,7 +276,7 @@ abstract class StateWithExecutionGraph implements State {
                         context.getMainThreadExecutor());
     }
 
-    CompletableFuture<String> triggerCheckpoint() {
+    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType 
checkpointType) {
         final CheckpointCoordinator checkpointCoordinator =
                 executionGraph.getCheckpointCoordinator();
         final JobID jobID = executionGraph.getJobID();
@@ -286,8 +287,7 @@ abstract class StateWithExecutionGraph implements State {
         logger.info("Triggering a checkpoint for job {}.", jobID);
 
         return checkpointCoordinator
-                .triggerCheckpoint(false)
-                .thenApply(CompletedCheckpoint::getExternalPointer)
+                .triggerCheckpoint(checkpointType)
                 .handleAsync(
                         (path, throwable) -> {
                             if (throwable != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 44e189da515..4f9085cda2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -143,6 +145,34 @@ public interface RestfulGateway extends RpcGateway {
      */
     CompletableFuture<ThreadDumpInfo> requestThreadDump(@RpcTimeout Time 
timeout);
 
+    /**
+     * Triggers a checkpoint with the given savepoint directory as a target.
+     *
+     * @param operationKey the key of the operation, for deduplication purposes
+     * @param checkpointType checkpoint backup type (configured / full / 
incremental)
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() 
external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<Acknowledge> triggerCheckpoint(
+            AsynchronousJobOperationKey operationKey,
+            CheckpointType checkpointType,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Get the status of a checkpoint triggered under the specified operation 
key.
+     *
+     * @param operationKey key of the operation
+     * @return Future which completes immediately with the status, or fails if 
no operation is
+     *     registered for the key
+     */
+    default CompletableFuture<OperationResult<Long>> 
getTriggeredCheckpointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Triggers a savepoint with the given savepoint directory as a target, 
returning a future that
      * completes when the operation is started.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6a0c4320a3b..259495a3f33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -67,6 +67,7 @@ import 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsH
 import 
org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
@@ -545,6 +546,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                 new SavepointHandlers.SavepointStatusHandler(
                         leaderRetriever, timeout, responseHeaders);
 
+        final CheckpointHandlers.CheckpointTriggerHandler 
checkpointTriggerHandler =
+                new CheckpointHandlers.CheckpointTriggerHandler(
+                        leaderRetriever, timeout, responseHeaders);
+
+        final CheckpointHandlers.CheckpointStatusHandler 
checkpointStatusHandler =
+                new CheckpointHandlers.CheckpointStatusHandler(
+                        leaderRetriever, timeout, responseHeaders);
+
         final SubtaskExecutionAttemptDetailsHandler 
subtaskExecutionAttemptDetailsHandler =
                 new SubtaskExecutionAttemptDetailsHandler(
                         leaderRetriever,
@@ -776,6 +785,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
         handlers.add(
                 Tuple2.of(stopWithSavepointHandler.getMessageHeaders(), 
stopWithSavepointHandler));
         handlers.add(Tuple2.of(savepointStatusHandler.getMessageHeaders(), 
savepointStatusHandler));
+        handlers.add(
+                Tuple2.of(checkpointTriggerHandler.getMessageHeaders(), 
checkpointTriggerHandler));
+        handlers.add(
+                Tuple2.of(checkpointStatusHandler.getMessageHeaders(), 
checkpointStatusHandler));
         handlers.add(
                 Tuple2.of(
                         
subtaskExecutionAttemptDetailsHandler.getMessageHeaders(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 66347194648..36ae8f3defe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -287,6 +288,171 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
                 is(CheckpointType.FULL_CHECKPOINT));
     }
 
+    @Test
+    public void testTriggeringCheckpointsWithNullCheckpointType() throws 
Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+
+        checkpointCoordinator.startCheckpointScheduler();
+        gateway.resetCount();
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(() -> 
checkpointCoordinator.triggerCheckpoint(null));
+    }
+
+    @Test
+    public void testTriggeringCheckpointsWithIncrementalCheckpointType() 
throws Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        ExecutionAttemptID attemptID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+
+        checkpointCoordinator.startCheckpointScheduler();
+        gateway.resetCount();
+
+        // trigger an incremental type checkpoint
+        final CompletableFuture<CompletedCheckpoint> checkpoint =
+                checkpointCoordinator.triggerCheckpoint(
+                        
org.apache.flink.core.execution.CheckpointType.INCREMENTAL);
+
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1),
+                TASK_MANAGER_LOCATION_INFO);
+        checkpoint.get();
+        Assertions.assertThat(
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.CHECKPOINT);
+    }
+
+    @Test
+    public void testTriggeringCheckpointsWithFullCheckpointType() throws 
Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        ExecutionAttemptID attemptID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+
+        checkpointCoordinator.startCheckpointScheduler();
+        gateway.resetCount();
+
+        // trigger an full type checkpoint
+        final CompletableFuture<CompletedCheckpoint> checkpoint =
+                checkpointCoordinator.triggerCheckpoint(
+                        org.apache.flink.core.execution.CheckpointType.FULL);
+
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1),
+                TASK_MANAGER_LOCATION_INFO);
+        checkpoint.get();
+        Assertions.assertThat(
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
+    }
+
+    @Test
+    public void 
testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint()
+            throws Exception {
+        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway 
gateway =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTaskManagerGateway(gateway)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        ExecutionAttemptID attemptID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        // create a savepoint, we can restore from later
+        final CompletedCheckpoint savepoint = takeSavepoint(graph, attemptID);
+
+        // restore from a savepoint in NO_CLAIM mode
+        final StandaloneCompletedCheckpointStore checkpointStore =
+                new StandaloneCompletedCheckpointStore(1);
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+        CheckpointCoordinator checkpointCoordinator =
+                createCheckpointCoordinator(graph, checkpointStore, 
checkpointIDCounter);
+        checkpointCoordinator.restoreSavepoint(
+                SavepointRestoreSettings.forPath(
+                        savepoint.getExternalPointer(), true, 
RestoreMode.NO_CLAIM),
+                graph.getAllVertices(),
+                this.getClass().getClassLoader());
+
+        // trigger a savepoint before any checkpoint completes
+        // next triggered checkpoint should still be a full one
+        takeSavepoint(graph, attemptID, checkpointCoordinator, 2);
+        checkpointCoordinator.startCheckpointScheduler();
+        gateway.resetCount();
+        // the checkpoint should be a FULL_CHECKPOINT even it is specified as 
incremental
+        final CompletableFuture<CompletedCheckpoint> checkpoint =
+                checkpointCoordinator.triggerCheckpoint(
+                        
org.apache.flink.core.execution.CheckpointType.INCREMENTAL);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+        checkpointCoordinator.receiveAcknowledgeMessage(
+                new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3),
+                TASK_MANAGER_LOCATION_INFO);
+        checkpoint.get();
+
+        Assertions.assertThat(
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
+    }
+
     private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, 
ExecutionAttemptID attemptID)
             throws Exception {
         CheckpointCoordinator checkpointCoordinator =
@@ -324,7 +490,7 @@ public class CheckpointCoordinatorTriggeringTest extends 
TestLogger {
         CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
                 new CheckpointCoordinatorConfigurationBuilder()
                         .setCheckpointInterval(
-                                10000) // periodic is ver long, we trigger 
checkpoint manually
+                                10000) // periodic is very long, we trigger 
checkpoint manually
                         .setCheckpointTimeout(200000) // timeout is very long 
(200 s)
                         .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
                         .build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
index 4cc522b7fe2..1272f41e3c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -51,12 +53,15 @@ public class DispatcherCachedOperationsHandlerTest extends 
TestLogger {
 
     private static final Time TIMEOUT = Time.minutes(10);
 
-    private CompletedOperationCache<AsynchronousJobOperationKey, String> cache;
+    private CompletedOperationCache<AsynchronousJobOperationKey, Long> 
checkpointTriggerCache;
+    private CompletedOperationCache<AsynchronousJobOperationKey, String> 
savepointTriggerCache;
     private DispatcherCachedOperationsHandler handler;
 
+    private TriggerCheckpointSpyFunction triggerCheckpointFunction;
     private TriggerSavepointSpyFunction triggerSavepointFunction;
     private TriggerSavepointSpyFunction stopWithSavepointFunction;
 
+    private CompletableFuture<Long> checkpointIdFuture = new 
CompletableFuture<>();
     private CompletableFuture<String> savepointLocationFuture = new 
CompletableFuture<>();
     private final JobID jobID = new JobID();
     private final String targetDirectory = "dummyDirectory";
@@ -64,6 +69,18 @@ public class DispatcherCachedOperationsHandlerTest extends 
TestLogger {
 
     @BeforeEach
     public void setup() {
+
+        checkpointIdFuture = new CompletableFuture<>();
+        triggerCheckpointFunction =
+                TriggerCheckpointSpyFunction.wrap(
+                        new TriggerCheckpointSpyFunction() {
+                            @Override
+                            CompletableFuture<Long> applyWrappedFunction(
+                                    JobID jobID, CheckpointType 
checkpointType, Time timeout) {
+                                return checkpointIdFuture;
+                            }
+                        });
+
         savepointLocationFuture = new CompletableFuture<>();
         triggerSavepointFunction =
                 TriggerSavepointSpyFunction.wrap(
@@ -73,12 +90,21 @@ public class DispatcherCachedOperationsHandlerTest extends 
TestLogger {
                 TriggerSavepointSpyFunction.wrap(
                         (jobID, targetDirectory, formatType, savepointMode, 
timeout) ->
                                 savepointLocationFuture);
-        cache =
+
+        checkpointTriggerCache =
+                new CompletedOperationCache<>(
+                        
RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
+
+        savepointTriggerCache =
                 new CompletedOperationCache<>(
                         
RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue());
         handler =
                 new DispatcherCachedOperationsHandler(
-                        triggerSavepointFunction, stopWithSavepointFunction, 
cache);
+                        triggerCheckpointFunction,
+                        checkpointTriggerCache,
+                        triggerSavepointFunction,
+                        stopWithSavepointFunction,
+                        savepointTriggerCache);
         operationKey = AsynchronousJobOperationKey.of(new TriggerId(), jobID);
     }
 
@@ -165,12 +191,14 @@ public class DispatcherCachedOperationsHandlerTest 
extends TestLogger {
                 .get();
 
         // should not complete because we wait for the result to be accessed
-        assertThat(cache.closeAsync(), 
FlinkMatchers.willNotComplete(Duration.ofMillis(10)));
+        assertThat(
+                savepointTriggerCache.closeAsync(),
+                FlinkMatchers.willNotComplete(Duration.ofMillis(10)));
     }
 
     @Test
     public void throwsIfCacheIsShuttingDown() {
-        cache.closeAsync();
+        savepointTriggerCache.closeAsync();
         assertThrows(
                 IllegalStateException.class,
                 () ->
@@ -208,6 +236,41 @@ public class DispatcherCachedOperationsHandlerTest extends 
TestLogger {
         assertThat(statusFuture, 
futureFailedWith(UnknownOperationKeyException.class));
     }
 
+    private abstract static class TriggerCheckpointSpyFunction
+            implements TriggerCheckpointFunction {
+
+        private final List<Tuple2<JobID, CheckpointType>> invocations = new 
ArrayList<>();
+
+        @Override
+        public CompletableFuture<Long> apply(
+                JobID jobID, CheckpointType checkpointType, Time timeout) {
+            invocations.add(new Tuple2<>(jobID, checkpointType));
+            return applyWrappedFunction(jobID, checkpointType, timeout);
+        }
+
+        abstract CompletableFuture<Long> applyWrappedFunction(
+                JobID jobID, CheckpointType checkpointType, Time timeout);
+
+        public List<Tuple2<JobID, CheckpointType>> getInvocationParameters() {
+            return invocations;
+        }
+
+        public int getNumberOfInvocations() {
+            return invocations.size();
+        }
+
+        public static TriggerCheckpointSpyFunction wrap(
+                TriggerCheckpointSpyFunction wrappedFunction) {
+            return new TriggerCheckpointSpyFunction() {
+                @Override
+                CompletableFuture<Long> applyWrappedFunction(
+                        JobID jobID, CheckpointType checkpointType, Time 
timeout) {
+                    return wrappedFunction.apply(jobID, checkpointType, 
timeout);
+                }
+            };
+        }
+    }
+
     private abstract static class TriggerSavepointSpyFunction implements 
TriggerSavepointFunction {
 
         private final List<Tuple4<JobID, String, SavepointFormatType, 
TriggerSavepointMode>>
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index afe38084018..a58cb12c334 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -24,10 +24,12 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -133,7 +135,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
     private final TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
             triggerSavepointFunction;
 
-    @Nonnull private final Supplier<CompletableFuture<String>> 
triggerCheckpointFunction;
+    @Nonnull
+    private final Function<CheckpointType, 
CompletableFuture<CompletedCheckpoint>>
+            triggerCheckpointFunction;
 
     @Nonnull
     private final TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
@@ -236,7 +240,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
             @Nonnull
                     TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                             triggerSavepointFunction,
-            @Nonnull Supplier<CompletableFuture<String>> 
triggerCheckpointFunction,
+            @Nonnull
+                    Function<CheckpointType, 
CompletableFuture<CompletedCheckpoint>>
+                            triggerCheckpointFunction,
             @Nonnull
                     TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                             stopWithSavepointFunction,
@@ -412,8 +418,9 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
     }
 
     @Override
-    public CompletableFuture<String> triggerCheckpoint(Time timeout) {
-        return triggerCheckpointFunction.get();
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointType checkpointType, Time timeout) {
+        return triggerCheckpointFunction.apply(checkpointType);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index 4560cc38cd5..31b547061bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -22,10 +22,12 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.blocklist.BlockedNode;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -125,8 +127,8 @@ public class TestingJobMasterGatewayBuilder {
                                     targetDirectory != null
                                             ? targetDirectory
                                             : UUID.randomUUID().toString());
-    private Supplier<CompletableFuture<String>> triggerCheckpointFunction =
-            () -> 
CompletableFuture.completedFuture(UUID.randomUUID().toString());
+    private Function<CheckpointType, CompletableFuture<CompletedCheckpoint>>
+            triggerCheckpointFunction = (prop) -> new CompletableFuture<>();
     private TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
             stopWithSavepointFunction =
                     (targetDirectory, ignoredB, formatType) ->
@@ -285,7 +287,8 @@ public class TestingJobMasterGatewayBuilder {
     }
 
     public TestingJobMasterGatewayBuilder setTriggerCheckpointFunction(
-            Supplier<CompletableFuture<String>> triggerCheckpointFunction) {
+            Function<CheckpointType, CompletableFuture<CompletedCheckpoint>>
+                    triggerCheckpointFunction) {
         this.triggerCheckpointFunction = triggerCheckpointFunction;
         return this;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java
new file mode 100644
index 00000000000..8152a1ba08b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.RestMatchers;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.OperationResult;
+import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CheckpointHandlers}. */
+class CheckpointHandlersTest extends TestLogger {
+
+    private static final Time TIMEOUT = Time.seconds(10);
+
+    private static final JobID JOB_ID = new JobID();
+
+    private static final Long COMPLETED_CHECKPOINT_ID = 123456L;
+
+    private static CheckpointHandlers.CheckpointTriggerHandler 
checkpointTriggerHandler;
+
+    private static CheckpointHandlers.CheckpointStatusHandler 
checkpointStatusHandler;
+
+    @BeforeAll
+    static void setUp() throws Exception {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+
+        checkpointTriggerHandler =
+                new CheckpointHandlers.CheckpointTriggerHandler(
+                        leaderRetriever, TIMEOUT, Collections.emptyMap());
+
+        checkpointStatusHandler =
+                new CheckpointHandlers.CheckpointStatusHandler(
+                        leaderRetriever, TIMEOUT, Collections.emptyMap());
+    }
+
+    @Test
+    void testCheckpointTriggerCompletedSuccessfully() throws Exception {
+        final OperationResult<Long> successfulResult =
+                OperationResult.success(COMPLETED_CHECKPOINT_ID);
+        final CompletableFuture<CheckpointType> checkpointPropertiesFuture =
+                new CompletableFuture<>();
+
+        final AtomicReference<AsynchronousJobOperationKey> keyReference = new 
AtomicReference<>();
+        final TestingRestfulGateway testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setTriggerCheckpointFunction(
+                                (AsynchronousJobOperationKey key,
+                                        CheckpointType checkpointType) -> {
+                                    keyReference.set(key);
+                                    
checkpointPropertiesFuture.complete(checkpointType);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setGetCheckpointStatusFunction(
+                                (AsynchronousJobOperationKey operationKey) -> {
+                                    if 
(operationKey.equals(keyReference.get())) {
+                                        return 
CompletableFuture.completedFuture(successfulResult);
+                                    }
+                                    throw new RuntimeException(
+                                            "Expected operation key "
+                                                    + keyReference.get()
+                                                    + ", but received "
+                                                    + operationKey);
+                                })
+                        .build();
+
+        final CheckpointType checkpointType = CheckpointType.FULL;
+
+        final TriggerId triggerId =
+                checkpointTriggerHandler
+                        .handleRequest(
+                                triggerCheckpointRequest(checkpointType, null),
+                                testingRestfulGateway)
+                        .get()
+                        .getTriggerId();
+
+        final AsynchronousOperationResult<CheckpointInfo> 
checkpointTriggerResponseBody =
+                checkpointStatusHandler
+                        .handleRequest(
+                                checkpointTriggerStatusRequest(triggerId), 
testingRestfulGateway)
+                        .get();
+
+        assertThat(checkpointTriggerResponseBody.queueStatus().getId())
+                .isEqualTo(QueueStatus.Id.COMPLETED);
+        assertThat(checkpointTriggerResponseBody.resource()).isNotNull();
+        assertThat(checkpointTriggerResponseBody.resource().getCheckpointId())
+                .isEqualTo(COMPLETED_CHECKPOINT_ID);
+        
assertThat(checkpointPropertiesFuture.get()).isEqualTo(CheckpointType.FULL);
+    }
+
+    @Test
+    void testTriggerCheckpointNoCheckpointType() throws Exception {
+        final OperationResult<Long> successfulResult =
+                OperationResult.success(COMPLETED_CHECKPOINT_ID);
+        final CompletableFuture<CheckpointType> checkpointTypeFuture = new 
CompletableFuture<>();
+
+        final AtomicReference<AsynchronousJobOperationKey> keyReference = new 
AtomicReference<>();
+        final TestingRestfulGateway testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setTriggerCheckpointFunction(
+                                (AsynchronousJobOperationKey key,
+                                        CheckpointType checkpointType) -> {
+                                    keyReference.set(key);
+                                    
checkpointTypeFuture.complete(checkpointType);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setGetCheckpointStatusFunction(
+                                (AsynchronousJobOperationKey operationKey) -> {
+                                    if 
(operationKey.equals(keyReference.get())) {
+                                        return 
CompletableFuture.completedFuture(successfulResult);
+                                    }
+                                    throw new RuntimeException(
+                                            "Expected operation key "
+                                                    + keyReference.get()
+                                                    + ", but received "
+                                                    + operationKey);
+                                })
+                        .build();
+
+        final TriggerId triggerId =
+                checkpointTriggerHandler
+                        .handleRequest(triggerCheckpointRequest(null, null), 
testingRestfulGateway)
+                        .get()
+                        .getTriggerId();
+
+        AsynchronousOperationResult<CheckpointInfo> 
checkpointTriggerResponseBody;
+        checkpointTriggerResponseBody =
+                checkpointStatusHandler
+                        .handleRequest(
+                                checkpointTriggerStatusRequest(triggerId), 
testingRestfulGateway)
+                        .get();
+
+        assertThat(checkpointTriggerResponseBody.queueStatus().getId())
+                .isEqualTo(QueueStatus.Id.COMPLETED);
+        assertThat(checkpointTriggerResponseBody.resource()).isNotNull();
+        assertThat(checkpointTriggerResponseBody.resource().getCheckpointId())
+                .isEqualTo(COMPLETED_CHECKPOINT_ID);
+        
assertThat(checkpointTypeFuture.get()).isEqualTo(CheckpointType.DEFAULT);
+    }
+
+    @Test
+    void testCheckpointCompletedWithException() throws Exception {
+        final OperationResult<Long> failedResult =
+                OperationResult.failure(new RuntimeException("expected"));
+
+        final AtomicReference<AsynchronousJobOperationKey> keyReference = new 
AtomicReference<>();
+        final TestingRestfulGateway testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setTriggerCheckpointFunction(
+                                (AsynchronousJobOperationKey key,
+                                        CheckpointType checkpointType) -> {
+                                    keyReference.set(key);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setGetCheckpointStatusFunction(
+                                (AsynchronousJobOperationKey operationKey) -> {
+                                    if 
(operationKey.equals(keyReference.get())) {
+                                        return 
CompletableFuture.completedFuture(failedResult);
+                                    }
+                                    throw new RuntimeException(
+                                            "Expected operation key "
+                                                    + keyReference.get()
+                                                    + ", but received "
+                                                    + operationKey);
+                                })
+                        .build();
+
+        final TriggerId triggerId =
+                checkpointTriggerHandler
+                        .handleRequest(triggerCheckpointRequest(null, null), 
testingRestfulGateway)
+                        .get()
+                        .getTriggerId();
+
+        AsynchronousOperationResult<CheckpointInfo> 
checkpointTriggerResponseBody;
+        checkpointTriggerResponseBody =
+                checkpointStatusHandler
+                        .handleRequest(
+                                checkpointTriggerStatusRequest(triggerId), 
testingRestfulGateway)
+                        .get();
+
+        assertThat(checkpointTriggerResponseBody.queueStatus().getId())
+                .isEqualTo(QueueStatus.Id.COMPLETED);
+        assertThat(checkpointTriggerResponseBody.resource()).isNotNull();
+        
assertThat(checkpointTriggerResponseBody.resource().getFailureCause()).isNotNull();
+
+        final Throwable checkpointError =
+                checkpointTriggerResponseBody
+                        .resource()
+                        .getFailureCause()
+                        .deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(checkpointError.getMessage()).matches("expected");
+        assertThat(checkpointError).isInstanceOf(RuntimeException.class);
+    }
+
+    @Test
+    void testProvidedTriggerId() throws Exception {
+        final OperationResult<Long> successfulResult =
+                OperationResult.success(COMPLETED_CHECKPOINT_ID);
+        final AtomicReference<AsynchronousJobOperationKey> keyReference = new 
AtomicReference<>();
+        final TestingRestfulGateway testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setTriggerCheckpointFunction(
+                                (AsynchronousJobOperationKey key,
+                                        CheckpointType checkpointType) -> {
+                                    keyReference.set(key);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .setGetCheckpointStatusFunction(
+                                (AsynchronousJobOperationKey operationKey) -> {
+                                    if 
(operationKey.equals(keyReference.get())) {
+                                        return 
CompletableFuture.completedFuture(successfulResult);
+                                    }
+                                    throw new RuntimeException(
+                                            "Expected operation key "
+                                                    + keyReference.get()
+                                                    + ", but received "
+                                                    + operationKey);
+                                })
+                        .build();
+
+        final TriggerId providedTriggerId = new TriggerId();
+
+        final TriggerId returnedTriggerId =
+                checkpointTriggerHandler
+                        .handleRequest(
+                                triggerCheckpointRequest(CheckpointType.FULL, 
providedTriggerId),
+                                testingRestfulGateway)
+                        .get()
+                        .getTriggerId();
+
+        assertThat(providedTriggerId).isEqualTo(returnedTriggerId);
+
+        AsynchronousOperationResult<CheckpointInfo> 
checkpointTriggerResponseBody;
+        checkpointTriggerResponseBody =
+                checkpointStatusHandler
+                        .handleRequest(
+                                
checkpointTriggerStatusRequest(providedTriggerId),
+                                testingRestfulGateway)
+                        .get();
+
+        assertThat(checkpointTriggerResponseBody.queueStatus().getId())
+                .isEqualTo(QueueStatus.Id.COMPLETED);
+        assertThat(checkpointTriggerResponseBody.resource()).isNotNull();
+        assertThat(checkpointTriggerResponseBody.resource().getCheckpointId())
+                .isEqualTo(COMPLETED_CHECKPOINT_ID);
+    }
+
+    @Test
+    void testQueryStatusOfUnknownOperationReturnsError()
+            throws HandlerRequestException, RestHandlerException {
+
+        final TestingRestfulGateway testingRestfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setGetCheckpointStatusFunction(
+                                key ->
+                                        FutureUtils.completedExceptionally(
+                                                new 
UnknownOperationKeyException(key)))
+                        .build();
+
+        final CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> 
statusFuture =
+                checkpointStatusHandler.handleRequest(
+                        checkpointTriggerStatusRequest(new TriggerId()), 
testingRestfulGateway);
+
+        assertThat(statusFuture)
+                
.matches(RestMatchers.respondsWithError(HttpResponseStatus.NOT_FOUND)::matches);
+    }
+
+    private static HandlerRequest<CheckpointTriggerRequestBody> 
triggerCheckpointRequest(
+            final CheckpointType checkpointType, @Nullable final TriggerId 
triggerId)
+            throws HandlerRequestException {
+        return HandlerRequest.resolveParametersAndCreate(
+                new CheckpointTriggerRequestBody(checkpointType, triggerId),
+                new CheckpointTriggerMessageParameters(),
+                Collections.singletonMap(JobIDPathParameter.KEY, 
JOB_ID.toString()),
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+
+    private static HandlerRequest<EmptyRequestBody> 
checkpointTriggerStatusRequest(
+            final TriggerId triggerId) throws HandlerRequestException {
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString());
+        pathParameters.put(TriggerIdPathParameter.KEY, triggerId.toString());
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new CheckpointStatusMessageParameters(),
+                pathParameters,
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 43386a2e705..c54511b7b3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -50,6 +52,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /** Testing implementation of the {@link SchedulerNG}. */
@@ -59,7 +62,8 @@ public class TestingSchedulerNG implements SchedulerNG {
     private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
     private final TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
             triggerSavepointFunction;
-    private final Supplier<CompletableFuture<String>> 
triggerCheckpointFunction;
+    private final Function<CheckpointType, 
CompletableFuture<CompletedCheckpoint>>
+            triggerCheckpointFunction;
     private final Consumer<Throwable> handleGlobalFailureConsumer;
 
     private TestingSchedulerNG(
@@ -68,7 +72,8 @@ public class TestingSchedulerNG implements SchedulerNG {
             Supplier<CompletableFuture<Void>> closeAsyncSupplier,
             TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                     triggerSavepointFunction,
-            Supplier<CompletableFuture<String>> triggerCheckpointFunction,
+            Function<CheckpointType, CompletableFuture<CompletedCheckpoint>>
+                    triggerCheckpointFunction,
             Consumer<Throwable> handleGlobalFailureConsumer) {
         this.jobTerminationFuture = jobTerminationFuture;
         this.startSchedulingRunnable = startSchedulingRunnable;
@@ -181,8 +186,8 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public CompletableFuture<String> triggerCheckpoint() {
-        return triggerCheckpointFunction.get();
+    public CompletableFuture<CompletedCheckpoint> 
triggerCheckpoint(CheckpointType checkpointType) {
+        return triggerCheckpointFunction.apply(checkpointType);
     }
 
     @Override
@@ -240,8 +245,8 @@ public class TestingSchedulerNG implements SchedulerNG {
         private TriFunction<String, Boolean, SavepointFormatType, 
CompletableFuture<String>>
                 triggerSavepointFunction =
                         (ignoredA, ignoredB, formatType) -> new 
CompletableFuture<>();
-        private Supplier<CompletableFuture<String>> triggerCheckpointFunction =
-                CompletableFuture::new;
+        private Function<CheckpointType, 
CompletableFuture<CompletedCheckpoint>>
+                triggerCheckpointFunction = (ignored) -> new 
CompletableFuture<>();
         private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> 
{};
 
         public Builder setJobTerminationFuture(CompletableFuture<JobStatus> 
jobTerminationFuture) {
@@ -267,7 +272,8 @@ public class TestingSchedulerNG implements SchedulerNG {
         }
 
         public Builder setTriggerCheckpointFunction(
-                Supplier<CompletableFuture<String>> triggerCheckpointFunction) 
{
+                Function<CheckpointType, 
CompletableFuture<CompletedCheckpoint>>
+                        triggerCheckpointFunction) {
             this.triggerCheckpointFunction = triggerCheckpointFunction;
             return this;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
index 343002648af..7136620a907 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -49,6 +50,7 @@ import org.apache.flink.util.function.TriFunction;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -127,6 +129,10 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
             Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>>
                     requestTaskManagerMetricQueryServiceGatewaysSupplier,
             Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier,
+            BiFunction<AsynchronousJobOperationKey, CheckpointType, 
CompletableFuture<Acknowledge>>
+                    triggerCheckpointFunction,
+            Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+                    getCheckpointStatusFunction,
             TriFunction<
                             AsynchronousJobOperationKey,
                             String,
@@ -174,6 +180,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
                 requestMetricQueryServiceAddressesSupplier,
                 requestTaskManagerMetricQueryServiceGatewaysSupplier,
                 requestThreadDumpSupplier,
+                triggerCheckpointFunction,
+                getCheckpointStatusFunction,
                 triggerSavepointFunction,
                 stopWithSavepointFunction,
                 getSavepointStatusFunction,
@@ -354,6 +362,8 @@ public final class TestingDispatcherGateway extends 
TestingRestfulGateway
                     requestMetricQueryServiceGatewaysSupplier,
                     requestTaskManagerMetricQueryServiceGatewaysSupplier,
                     requestThreadDumpSupplier,
+                    triggerCheckpointFunction,
+                    getCheckpointStatusFunction,
                     triggerSavepointFunction,
                     triggerSavepointAndGetLocationFunction,
                     stopWithSavepointFunction,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index ffd53db2b9b..997a1fe6865 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
@@ -45,6 +46,7 @@ import org.apache.flink.util.function.TriFunction;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -85,6 +87,16 @@ public class TestingRestfulGateway implements RestfulGateway 
{
             () -> FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     static final Supplier<CompletableFuture<Acknowledge>> 
DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER =
             () -> CompletableFuture.completedFuture(Acknowledge.get());
+    static final BiFunction<
+                    AsynchronousJobOperationKey, CheckpointType, 
CompletableFuture<Acknowledge>>
+            DEFAULT_TRIGGER_CHECKPOINT_FUNCTION =
+                    (AsynchronousJobOperationKey operationKey, CheckpointType 
checkpointType) ->
+                            FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+    static final Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+            DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION =
+                    (AsynchronousJobOperationKey operationKey) ->
+                            FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+
     static final TriFunction<
                     AsynchronousJobOperationKey,
                     String,
@@ -152,6 +164,13 @@ public class TestingRestfulGateway implements 
RestfulGateway {
 
     protected Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier;
 
+    protected BiFunction<
+                    AsynchronousJobOperationKey, CheckpointType, 
CompletableFuture<Acknowledge>>
+            triggerCheckpointFunction;
+
+    protected Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+            getCheckpointStatusFunction;
+
     protected TriFunction<
                     AsynchronousJobOperationKey,
                     String,
@@ -190,6 +209,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                 DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER,
                 
DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER,
                 DEFAULT_REQUEST_THREAD_DUMP_SUPPLIER,
+                DEFAULT_TRIGGER_CHECKPOINT_FUNCTION,
+                DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION,
                 DEFAULT_TRIGGER_SAVEPOINT_FUNCTION,
                 DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION,
                 DEFAULT_GET_SAVEPOINT_STATUS_FUNCTION,
@@ -213,6 +234,10 @@ public class TestingRestfulGateway implements 
RestfulGateway {
             Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>>
                     requestTaskManagerMetricQueryServiceAddressesSupplier,
             Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier,
+            BiFunction<AsynchronousJobOperationKey, CheckpointType, 
CompletableFuture<Acknowledge>>
+                    triggerCheckpointFunction,
+            Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+                    getCheckpointStatusFunction,
             TriFunction<
                             AsynchronousJobOperationKey,
                             String,
@@ -248,6 +273,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
         this.requestTaskManagerMetricQueryServiceAddressesSupplier =
                 requestTaskManagerMetricQueryServiceAddressesSupplier;
         this.requestThreadDumpSupplier = requestThreadDumpSupplier;
+        this.triggerCheckpointFunction = triggerCheckpointFunction;
+        this.getCheckpointStatusFunction = getCheckpointStatusFunction;
         this.triggerSavepointFunction = triggerSavepointFunction;
         this.stopWithSavepointFunction = stopWithSavepointFunction;
         this.getSavepointStatusFunction = getSavepointStatusFunction;
@@ -313,6 +340,18 @@ public class TestingRestfulGateway implements 
RestfulGateway {
         return null;
     }
 
+    @Override
+    public CompletableFuture<Acknowledge> triggerCheckpoint(
+            AsynchronousJobOperationKey operationKey, CheckpointType 
checkpointType, Time timeout) {
+        return triggerCheckpointFunction.apply(operationKey, checkpointType);
+    }
+
+    @Override
+    public CompletableFuture<OperationResult<Long>> 
getTriggeredCheckpointStatus(
+            AsynchronousJobOperationKey operationKey) {
+        return getCheckpointStatusFunction.apply(operationKey);
+    }
+
     @Override
     public CompletableFuture<Acknowledge> triggerSavepoint(
             AsynchronousJobOperationKey operationKey,
@@ -383,6 +422,11 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                 requestTaskManagerMetricQueryServiceGatewaysSupplier;
         protected Supplier<CompletableFuture<ThreadDumpInfo>> 
requestThreadDumpSupplier;
         protected Supplier<CompletableFuture<Acknowledge>> 
clusterShutdownSupplier;
+        protected BiFunction<
+                        AsynchronousJobOperationKey, CheckpointType, 
CompletableFuture<Acknowledge>>
+                triggerCheckpointFunction;
+        protected Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+                getCheckpointStatusFunction;
         protected TriFunction<
                         AsynchronousJobOperationKey,
                         String,
@@ -416,6 +460,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                     DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER;
             requestTaskManagerMetricQueryServiceGatewaysSupplier =
                     
DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER;
+            triggerCheckpointFunction = DEFAULT_TRIGGER_CHECKPOINT_FUNCTION;
+            getCheckpointStatusFunction = 
DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION;
             triggerSavepointFunction = DEFAULT_TRIGGER_SAVEPOINT_FUNCTION;
             stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION;
             getSavepointStatusFunction = DEFAULT_GET_SAVEPOINT_STATUS_FUNCTION;
@@ -500,6 +546,23 @@ public class TestingRestfulGateway implements 
RestfulGateway {
             return self();
         }
 
+        public T setTriggerCheckpointFunction(
+                BiFunction<
+                                AsynchronousJobOperationKey,
+                                CheckpointType,
+                                CompletableFuture<Acknowledge>>
+                        triggerCheckpointFunction) {
+            this.triggerCheckpointFunction = triggerCheckpointFunction;
+            return self();
+        }
+
+        public T setGetCheckpointStatusFunction(
+                Function<AsynchronousJobOperationKey, 
CompletableFuture<OperationResult<Long>>>
+                        getCheckpointStatusFunction) {
+            this.getCheckpointStatusFunction = getCheckpointStatusFunction;
+            return self();
+        }
+
         public T setTriggerSavepointFunction(
                 TriFunction<
                                 AsynchronousJobOperationKey,
@@ -569,6 +632,8 @@ public class TestingRestfulGateway implements 
RestfulGateway {
                     requestMetricQueryServiceGatewaysSupplier,
                     requestTaskManagerMetricQueryServiceGatewaysSupplier,
                     requestThreadDumpSupplier,
+                    triggerCheckpointFunction,
+                    getCheckpointStatusFunction,
                     triggerSavepointFunction,
                     stopWithSavepointFunction,
                     getSavepointStatusFunction,

Reply via email to