This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5d66e82915e [FLINK-27101][checkpointing][rest] Add restful API to trigger checkpoints 5d66e82915e is described below commit 5d66e82915eace9342c175163b17f610bfbf7fa4 Author: Jiale <jiale....@vungle.com> AuthorDate: Thu Oct 13 11:54:29 2022 -0700 [FLINK-27101][checkpointing][rest] Add restful API to trigger checkpoints --- .../shortcodes/generated/rest_v1_dispatcher.html | 141 +++++++++ docs/static/generated/rest_v1_dispatcher.yml | 70 +++++ .../flink/core/execution/CheckpointType.java | 48 +++ .../src/test/resources/rest_api_v1.snapshot | 73 +++++ .../runtime/checkpoint/CheckpointCoordinator.java | 46 ++- .../flink/runtime/checkpoint/CheckpointType.java | 3 + .../flink/runtime/dispatcher/Dispatcher.java | 25 ++ .../DispatcherCachedOperationsHandler.java | 35 +++ .../dispatcher/DispatcherOperationCaches.java | 13 +- .../dispatcher/TriggerCheckpointFunction.java | 34 ++ .../apache/flink/runtime/jobmaster/JobMaster.java | 13 +- .../flink/runtime/jobmaster/JobMasterGateway.java | 17 +- .../job/checkpoints/CheckpointHandlers.java | 257 +++++++++++++++ .../CheckpointInfo.java} | 31 +- .../checkpoints/CheckpointStatusHeaders.java | 81 +++++ .../CheckpointStatusMessageParameters.java | 48 +++ .../checkpoints/CheckpointTriggerHeaders.java | 77 +++++ .../CheckpointTriggerMessageParameters.java | 43 +++ .../checkpoints/CheckpointTriggerRequestBody.java | 63 ++++ .../messages/job/savepoints/SavepointInfo.java | 3 + .../flink/runtime/scheduler/SchedulerBase.java | 6 +- .../flink/runtime/scheduler/SchedulerNG.java | 4 +- .../scheduler/adaptive/AdaptiveScheduler.java | 7 +- .../adaptive/StateWithExecutionGraph.java | 6 +- .../flink/runtime/webmonitor/RestfulGateway.java | 30 ++ .../runtime/webmonitor/WebMonitorEndpoint.java | 13 + .../CheckpointCoordinatorTriggeringTest.java | 168 +++++++++- .../DispatcherCachedOperationsHandlerTest.java | 73 ++++- .../jobmaster/utils/TestingJobMasterGateway.java | 15 +- .../utils/TestingJobMasterGatewayBuilder.java | 9 +- .../job/checkpoints/CheckpointHandlersTest.java | 345 +++++++++++++++++++++ .../runtime/scheduler/TestingSchedulerNG.java | 20 +- .../webmonitor/TestingDispatcherGateway.java | 10 + .../runtime/webmonitor/TestingRestfulGateway.java | 65 ++++ 34 files changed, 1841 insertions(+), 51 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index f9cfafd5954..93122afd75c 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -1776,6 +1776,68 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa </tr> </tbody> </table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/checkpoints</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>POST</code></td> + <td class="text-left">Response code: <code>202 Accepted</code></td> + </tr> + <tr> + <td colspan="2">Triggers a checkpoint. This async operation would return a 'triggerid' for further query identifier.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Request</summary> + <pre><code>{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointTriggerRequestBody", + "properties" : { + "checkpointType" : { + "type" : "string", + "enum" : [ "CONFIGURED", "FULL", "INCREMENTAL" ] + }, + "triggerId" : { + "type" : "any" + } + } +}</code></pre> + </label> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Response</summary> + <pre><code>{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse", + "properties" : { + "request-id" : { + "type" : "any" + } + } +}</code></pre> + </label> + </td> + </tr> + </tbody> +</table> <table class="rest-api table table-bordered"> <tbody> <tr> @@ -2194,6 +2256,77 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa </tr> </tbody> </table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/checkpoints/:triggerid</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Returns the status of a checkpoint trigger operation.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> +<li><code>triggerid</code> - 32-character hexadecimal string that identifies an asynchronous operation trigger ID. The ID was returned then the operation was triggered.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Request</summary> + <pre><code>{}</code></pre> + </label> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Response</summary> + <pre><code>{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult", + "properties" : { + "operation" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointInfo", + "properties" : { + "checkpointId" : { + "type" : "integer" + }, + "failureCause" : { + "type" : "any" + } + } + }, + "status" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus", + "properties" : { + "id" : { + "type" : "string", + "required" : true, + "enum" : [ "IN_PROGRESS", "COMPLETED" ] + } + } + } + } +}</code></pre> + </label> + </td> + </tr> + </tbody> +</table> <table class="rest-api table table-bordered"> <tbody> <tr> @@ -3467,6 +3600,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string", "enum" : [ "ok", "low", "high" ] }, + "backpressureLevel" : { + "type" : "string", + "enum" : [ "ok", "low", "high" ] + }, "end-timestamp" : { "type" : "integer" }, @@ -3487,6 +3624,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string", "enum" : [ "ok", "low", "high" ] }, + "backpressureLevel" : { + "type" : "string", + "enum" : [ "ok", "low", "high" ] + }, "busyRatio" : { "type" : "number" }, diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 60b22473cb3..4d7a520d316 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -495,6 +495,29 @@ paths: application/json: schema: $ref: '#/components/schemas/CheckpointingStatistics' + post: + description: Triggers a checkpoint. This async operation would return a 'triggerid' + for further query identifier. + operationId: triggerCheckpoint + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CheckpointTriggerRequestBody' + responses: + "202": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/TriggerResponse' /jobs/{jobid}/checkpoints/config: get: description: Returns the checkpointing configuration. @@ -569,6 +592,31 @@ paths: application/json: schema: $ref: '#/components/schemas/TaskCheckpointStatisticsWithSubtaskDetails' + /jobs/{jobid}/checkpoints/{triggerid}: + get: + description: Returns the status of a checkpoint trigger operation. + operationId: getCheckpointStatus + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + - name: triggerid + in: path + description: 32-character hexadecimal string that identifies an asynchronous + operation trigger ID. The ID was returned then the operation was triggered. + required: true + schema: + $ref: '#/components/schemas/TriggerId' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/AsynchronousOperationResult' /jobs/{jobid}/config: get: description: Returns the configuration of a job. @@ -1559,6 +1607,7 @@ components: operation: oneOf: - $ref: '#/components/schemas/AsynchronousOperationInfo' + - $ref: '#/components/schemas/CheckpointInfo' - $ref: '#/components/schemas/SavepointInfo' status: $ref: '#/components/schemas/QueueStatus' @@ -1626,6 +1675,14 @@ components: sync: type: integer format: int64 + CheckpointInfo: + type: object + properties: + checkpointId: + type: integer + format: int64 + failureCause: + $ref: '#/components/schemas/SerializedThrowable' CheckpointStatistics: required: - className @@ -1684,6 +1741,19 @@ components: - IN_PROGRESS - COMPLETED - FAILED + CheckpointTriggerRequestBody: + type: object + properties: + checkpointType: + $ref: '#/components/schemas/CheckpointType' + triggerId: + $ref: '#/components/schemas/TriggerId' + CheckpointType: + type: string + enum: + - CONFIGURED + - FULL + - INCREMENTAL CheckpointingStatistics: type: object properties: diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java new file mode 100644 index 00000000000..521846b174e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointType.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.DescribedEnum; +import org.apache.flink.configuration.description.InlineElement; + +import static org.apache.flink.configuration.description.TextElement.text; + +/** Describes the type in which a checkpoint should be taken. */ +@PublicEvolving +public enum CheckpointType implements DescribedEnum { + CONFIGURED("The checkpoint type derived from the job config"), + + FULL("A checkpoint type that checkpoints the entire state, common for all state backends."), + + INCREMENTAL( + "A checkpoint type that checkpoints only the difference between snapshots, specific for certain state backend."); + + private final InlineElement description; + public static final CheckpointType DEFAULT = CheckpointType.CONFIGURED; + + CheckpointType(String description) { + this.description = text(description); + } + + @Override + public InlineElement getDescription() { + return description; + } +} diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index b13156998af..b952536a663 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1343,6 +1343,41 @@ } } } + }, { + "url" : "/jobs/:jobid/checkpoints", + "method" : "POST", + "status-code" : "202 Accepted", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:checkpoints:CheckpointTriggerRequestBody", + "properties" : { + "checkpointType" : { + "type" : "string", + "enum" : [ "CONFIGURED", "FULL", "INCREMENTAL" ] + }, + "triggerId" : { + "type" : "any" + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:TriggerResponse", + "properties" : { + "request-id" : { + "type" : "any" + } + } + } }, { "url" : "/jobs/:jobid/checkpoints/config", "method" : "GET", @@ -1689,6 +1724,44 @@ } } } + }, { + "url" : "/jobs/:jobid/checkpoints/:triggerid", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + }, { + "key" : "triggerid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:handler:async:AsynchronousOperationResult", + "properties" : { + "status" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:queue:QueueStatus", + "properties" : { + "id" : { + "type" : "string", + "required" : true, + "enum" : [ "IN_PROGRESS", "COMPLETED" ] + } + } + }, + "operation" : { + "type" : "any" + } + } + } }, { "url" : "/jobs/:jobid/config", "method" : "GET", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0f5033ac8ed..49cc6449b3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; @@ -85,6 +86,8 @@ import java.util.function.Predicate; import java.util.stream.Stream; import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; +import static org.apache.flink.runtime.checkpoint.CheckpointType.FULL_CHECKPOINT; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -511,6 +514,47 @@ public class CheckpointCoordinator { return triggerCheckpointFromCheckpointThread(checkpointProperties, null, isPeriodic); } + /** + * Triggers one new checkpoint with the given checkpointType. The returned future completes when + * the triggered checkpoint finishes or an error occurred. + * + * @param checkpointType specifies the back up type of the checkpoint to trigger. + * @return a future to the completed checkpoint. + */ + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) { + + if (checkpointType == null) { + throw new IllegalArgumentException("checkpointType cannot be null"); + } + + final SnapshotType snapshotType; + switch (checkpointType) { + case CONFIGURED: + snapshotType = checkpointProperties.getCheckpointType(); + break; + case FULL: + snapshotType = FULL_CHECKPOINT; + break; + case INCREMENTAL: + snapshotType = CHECKPOINT; + break; + default: + throw new IllegalArgumentException("unknown checkpointType: " + checkpointType); + } + + final CheckpointProperties properties = + new CheckpointProperties( + checkpointProperties.forceCheckpoint(), + snapshotType, + checkpointProperties.discardOnSubsumed(), + checkpointProperties.discardOnJobFinished(), + checkpointProperties.discardOnJobCancelled(), + checkpointProperties.discardOnJobFailed(), + checkpointProperties.discardOnJobSuspended(), + checkpointProperties.isUnclaimed()); + return triggerCheckpointFromCheckpointThread(properties, null, false); + } + @VisibleForTesting CompletableFuture<CompletedCheckpoint> triggerCheckpoint( CheckpointProperties props, @@ -734,7 +778,7 @@ public class CheckpointCoordinator { final SnapshotType type; if (this.forceFullSnapshot && !request.props.isSavepoint()) { - type = CheckpointType.FULL_CHECKPOINT; + type = FULL_CHECKPOINT; } else { type = request.props.getCheckpointType(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java index 6ac305dc0d5..08f3927852d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointType.java @@ -18,9 +18,12 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.Internal; + import java.util.Objects; /** The type of checkpoint to perform. */ +@Internal public final class CheckpointType implements SnapshotType { /** A checkpoint, full or incremental. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index ca72be6f4d5..89c6d898e75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -29,10 +29,12 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -271,6 +273,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler( dispatcherServices.getOperationCaches(), + this::triggerCheckpointAndGetCheckpointID, this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation); @@ -902,6 +905,28 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> jobID, gateway -> gateway.triggerCheckpoint(timeout)); } + @Override + public CompletableFuture<Acknowledge> triggerCheckpoint( + AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Time timeout) { + return dispatcherCachedOperationsHandler.triggerCheckpoint( + operationKey, checkpointType, timeout); + } + + @Override + public CompletableFuture<OperationResult<Long>> getTriggeredCheckpointStatus( + AsynchronousJobOperationKey operationKey) { + return dispatcherCachedOperationsHandler.getCheckpointStatus(operationKey); + } + + private CompletableFuture<Long> triggerCheckpointAndGetCheckpointID( + final JobID jobID, final CheckpointType checkpointType, final Time timeout) { + return performOperationOnJobMasterGateway( + jobID, + gateway -> + gateway.triggerCheckpoint(checkpointType, timeout) + .thenApply(CompletedCheckpoint::getCheckpointID)); + } + @Override public CompletableFuture<Acknowledge> triggerSavepoint( final AsynchronousJobOperationKey operationKey, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java index 90daa799cd1..046e51ec404 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; @@ -39,15 +40,22 @@ public class DispatcherCachedOperationsHandler { private final CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache; + private final CompletedOperationCache<AsynchronousJobOperationKey, Long> checkpointTriggerCache; + + private final TriggerCheckpointFunction triggerCheckpointFunction; + private final TriggerSavepointFunction triggerSavepointFunction; private final TriggerSavepointFunction stopWithSavepointFunction; DispatcherCachedOperationsHandler( DispatcherOperationCaches operationCaches, + TriggerCheckpointFunction triggerCheckpointFunction, TriggerSavepointFunction triggerSavepointFunction, TriggerSavepointFunction stopWithSavepointFunction) { this( + triggerCheckpointFunction, + operationCaches.getCheckpointTriggerCache(), triggerSavepointFunction, stopWithSavepointFunction, operationCaches.getSavepointTriggerCache()); @@ -55,14 +63,41 @@ public class DispatcherCachedOperationsHandler { @VisibleForTesting DispatcherCachedOperationsHandler( + TriggerCheckpointFunction triggerCheckpointFunction, + CompletedOperationCache<AsynchronousJobOperationKey, Long> checkpointTriggerCache, TriggerSavepointFunction triggerSavepointFunction, TriggerSavepointFunction stopWithSavepointFunction, CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) { + this.triggerCheckpointFunction = triggerCheckpointFunction; + this.checkpointTriggerCache = checkpointTriggerCache; this.triggerSavepointFunction = triggerSavepointFunction; this.stopWithSavepointFunction = stopWithSavepointFunction; this.savepointTriggerCache = savepointTriggerCache; } + public CompletableFuture<Acknowledge> triggerCheckpoint( + AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Time timeout) { + + if (!checkpointTriggerCache.containsOperation(operationKey)) { + checkpointTriggerCache.registerOngoingOperation( + operationKey, + triggerCheckpointFunction.apply( + operationKey.getJobId(), checkpointType, timeout)); + } + + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + public CompletableFuture<OperationResult<Long>> getCheckpointStatus( + AsynchronousJobOperationKey operationKey) { + return checkpointTriggerCache + .get(operationKey) + .map(CompletableFuture::completedFuture) + .orElse( + FutureUtils.completedExceptionally( + new UnknownOperationKeyException(operationKey))); + } + public CompletableFuture<Acknowledge> triggerSavepoint( AsynchronousJobOperationKey operationKey, String targetDirectory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java index 36331796bb8..85851851b29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherOperationCaches.java @@ -22,8 +22,10 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache; import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.concurrent.FutureUtils; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; /** @@ -33,6 +35,8 @@ public class DispatcherOperationCaches implements AutoCloseableAsync { private final CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache; + private final CompletedOperationCache<AsynchronousJobOperationKey, Long> checkpointTriggerCache; + @VisibleForTesting public DispatcherOperationCaches() { this(RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue()); @@ -41,14 +45,21 @@ public class DispatcherOperationCaches implements AutoCloseableAsync { @VisibleForTesting public DispatcherOperationCaches(Duration cacheDuration) { savepointTriggerCache = new CompletedOperationCache<>(cacheDuration); + checkpointTriggerCache = new CompletedOperationCache<>(cacheDuration); } public CompletedOperationCache<AsynchronousJobOperationKey, String> getSavepointTriggerCache() { return savepointTriggerCache; } + public CompletedOperationCache<AsynchronousJobOperationKey, Long> getCheckpointTriggerCache() { + return checkpointTriggerCache; + } + @Override public CompletableFuture<Void> closeAsync() { - return savepointTriggerCache.closeAsync(); + return FutureUtils.completeAll( + Arrays.asList( + savepointTriggerCache.closeAsync(), checkpointTriggerCache.closeAsync())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java new file mode 100644 index 00000000000..a874972f91c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.execution.CheckpointType; + +import java.util.concurrent.CompletableFuture; + +/** + * Wrapper interface for functions triggering checkpoints. Currently only serves to shorten + * signatures. + */ +@FunctionalInterface +public interface TriggerCheckpointFunction { + CompletableFuture<Long> apply(JobID jobId, CheckpointType checkpointType, Time timeout); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3dfaa1c8776..27c81b30acb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; @@ -33,6 +34,7 @@ import org.apache.flink.runtime.blocklist.BlocklistContext; import org.apache.flink.runtime.blocklist.BlocklistHandler; import org.apache.flink.runtime.blocklist.BlocklistUtils; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -853,6 +855,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return CompletableFuture.completedFuture(schedulerNG.requestJob()); } + @Override + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint( + final CheckpointType checkpointType, final Time timeout) { + return schedulerNG.triggerCheckpoint(checkpointType); + } + @Override public CompletableFuture<String> triggerSavepoint( @Nullable final String targetDirectory, @@ -863,11 +871,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return schedulerNG.triggerSavepoint(targetDirectory, cancelJob, formatType); } - @Override - public CompletableFuture<String> triggerCheckpoint(Time timeout) { - return schedulerNG.triggerCheckpoint(); - } - @Override public CompletableFuture<String> stopWithSavepoint( @Nullable final String targetDirectory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 4cbd4682d66..c99fda1546e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -22,9 +22,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.blocklist.BlocklistListener; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; @@ -218,13 +220,26 @@ public interface JobMasterGateway final SavepointFormatType formatType, @RpcTimeout final Time timeout); + /** + * Triggers taking a checkpoint of the executed job. + * + * @param checkpointType to determine how checkpoint should be taken + * @param timeout for the rpc call + * @return Future which is completed with the CompletedCheckpoint once completed + */ + CompletableFuture<CompletedCheckpoint> triggerCheckpoint( + final CheckpointType checkpointType, @RpcTimeout final Time timeout); + /** * Triggers taking a checkpoint of the executed job. * * @param timeout for the rpc call * @return Future which is completed with the checkpoint path once completed */ - CompletableFuture<String> triggerCheckpoint(@RpcTimeout final Time timeout); + default CompletableFuture<String> triggerCheckpoint(@RpcTimeout final Time timeout) { + return triggerCheckpoint(CheckpointType.DEFAULT, timeout) + .thenApply(CompletedCheckpoint::getExternalPointer); + } /** * Stops the job with a savepoint. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java new file mode 100644 index 00000000000..871a2dfdabf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlers.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * HTTP handlers for asynchronous triggering of checkpoints. + * + * <p>Drawing checkpoints is a potentially long-running operation. To avoid blocking HTTP + * connections, checkpoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the checkpoint asynchronously. The request will be assigned a {@link TriggerId}, which is + * returned in the response body. Next, the returned {@link TriggerId} should be used to poll the + * status of the checkpoint until it is finished. + * + * <p>A checkpoint is triggered by sending an HTTP {@code POST} request to {@code + * /jobs/:jobid/checkpoints}. The HTTP request may contain a JSON body to specify a customized + * {@link TriggerId} and a {@link CheckpointType}, e.g., + * + * <pre> + * { "triggerId": "7d273f5a62eb4730b9dea8e833733c1e", "checkpointType": "FULL" } + * </pre> + * + * <p>If the body is omitted, or the field {@code checkpointType} is {@code null}, the default + * checkpointType of {@link CheckpointType#CONFIGURED} will be used. As written above, the response + * will contain a request id, e.g., + * + * <pre> + * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" } + * </pre> + * + * <p>To poll for the status of an ongoing checkpoint, an HTTP {@code GET} request is issued to + * {@code /jobs/:jobid/checkpoints/:checkpointtriggerid}. If the specified checkpoint is still + * ongoing, the response will be + * + * <pre> + * { + * "status": { + * "id": "IN_PROGRESS" + * } + * } + * </pre> + * + * <p>If the specified checkpoints has completed, the status id will transition to {@code + * COMPLETED}, and the response will additionally contain information about the savepoint, such as + * the location: + * + * <pre> + * { + * "status": { + * "id": "COMPLETED" + * }, + * "operation": { + * "checkpointId": "123" + * } + * } + * </pre> + */ +public class CheckpointHandlers { + + /** Handler for the checkpoint trigger operation. */ + public static class CheckpointTriggerHandler + extends AbstractRestHandler< + RestfulGateway, + CheckpointTriggerRequestBody, + TriggerResponse, + CheckpointTriggerMessageParameters> { + + public CheckpointTriggerHandler( + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout, + final Map<String, String> responseHeaders) { + super( + leaderRetriever, + timeout, + responseHeaders, + CheckpointTriggerHeaders.getInstance()); + } + + private static AsynchronousJobOperationKey createOperationKey( + final HandlerRequest<CheckpointTriggerRequestBody> request) { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return AsynchronousJobOperationKey.of( + request.getRequestBody().getTriggerId().orElseGet(TriggerId::new), jobId); + } + + @Override + protected CompletableFuture<TriggerResponse> handleRequest( + @Nonnull HandlerRequest<CheckpointTriggerRequestBody> request, + @Nonnull RestfulGateway gateway) + throws RestHandlerException { + final AsynchronousJobOperationKey operationKey = createOperationKey(request); + + return gateway.triggerCheckpoint( + operationKey, + request.getRequestBody().getCheckpointType(), + RpcUtils.INF_TIMEOUT) + .handle( + (acknowledge, throwable) -> { + if (throwable == null) { + return new TriggerResponse(operationKey.getTriggerId()); + } else { + throw new CompletionException( + createInternalServerError( + throwable, operationKey, "triggering")); + } + }); + } + } + + /** HTTP handler to query for the status of the checkpoint. */ + public static class CheckpointStatusHandler + extends AbstractRestHandler< + RestfulGateway, + EmptyRequestBody, + AsynchronousOperationResult<CheckpointInfo>, + CheckpointStatusMessageParameters> { + + public CheckpointStatusHandler( + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout, + final Map<String, String> responseHeaders) { + super(leaderRetriever, timeout, responseHeaders, CheckpointStatusHeaders.getInstance()); + } + + @Override + public CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) + throws RestHandlerException { + + final AsynchronousJobOperationKey key = getOperationKey(request); + + return gateway.getTriggeredCheckpointStatus(key) + .handle( + (operationResult, throwable) -> { + if (throwable == null) { + switch (operationResult.getStatus()) { + case SUCCESS: + return AsynchronousOperationResult.completed( + operationResultResponse( + operationResult.getResult())); + case FAILURE: + return AsynchronousOperationResult.completed( + exceptionalOperationResultResponse( + operationResult.getThrowable())); + case IN_PROGRESS: + return AsynchronousOperationResult.inProgress(); + default: + throw new IllegalStateException( + "No handler for operation status " + + operationResult.getStatus() + + ", encountered for key " + + key); + } + } else { + throw new CompletionException( + maybeCreateNotFoundError(throwable, key) + .orElseGet( + () -> + createInternalServerError( + throwable, + key, + "retrieving status of"))); + } + }); + } + + private static Optional<RestHandlerException> maybeCreateNotFoundError( + Throwable throwable, AsynchronousJobOperationKey key) { + if (ExceptionUtils.findThrowable(throwable, UnknownOperationKeyException.class) + .isPresent()) { + return Optional.of( + new RestHandlerException( + String.format( + "There is no checkpoint operation with triggerId=%s for job %s.", + key.getTriggerId(), key.getJobId()), + HttpResponseStatus.NOT_FOUND)); + } + return Optional.empty(); + } + + private static AsynchronousJobOperationKey getOperationKey( + HandlerRequest<EmptyRequestBody> request) { + final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return AsynchronousJobOperationKey.of(triggerId, jobId); + } + + private static CheckpointInfo exceptionalOperationResultResponse( + final Throwable throwable) { + return new CheckpointInfo(null, new SerializedThrowable(throwable)); + } + + private static CheckpointInfo operationResultResponse(final Long checkpointId) { + return new CheckpointInfo(checkpointId, null); + } + } + + private static RestHandlerException createInternalServerError( + Throwable throwable, AsynchronousJobOperationKey key, String errorMessageInfix) { + return new RestHandlerException( + String.format( + "Internal server error while %s checkpoint operation with triggerId=%s for job %s.", + errorMessageInfix, key.getTriggerId(), key.getJobId()), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + throwable); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java similarity index 75% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java index 33cd1530e48..62639b839fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointInfo.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.messages.job.savepoints; +package org.apache.flink.runtime.rest.messages.checkpoints; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.json.SerializedThrowableDeserializer; @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; @@ -33,17 +34,17 @@ import javax.annotation.Nullable; import static org.apache.flink.util.Preconditions.checkArgument; -/** Represents information about a finished savepoint. */ +/** Represents information about a triggered checkpoint. */ @JsonInclude(JsonInclude.Include.NON_NULL) -public class SavepointInfo implements ResponseBody { +public class CheckpointInfo implements ResponseBody { - private static final String FIELD_NAME_LOCATION = "location"; + private static final String FIELD_NAME_CHECKPOINT_ID = "checkpointId"; - private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause"; + private static final String FIELD_NAME_FAILURE_CAUSE = "failureCause"; - @JsonProperty(FIELD_NAME_LOCATION) + @JsonProperty(FIELD_NAME_CHECKPOINT_ID) @Nullable - private final String location; + private final Long checkpointId; @JsonProperty(FIELD_NAME_FAILURE_CAUSE) @JsonSerialize(using = SerializedThrowableSerializer.class) @@ -52,26 +53,28 @@ public class SavepointInfo implements ResponseBody { private final SerializedThrowable failureCause; @JsonCreator - public SavepointInfo( - @JsonProperty(FIELD_NAME_LOCATION) @Nullable final String location, + public CheckpointInfo( + @JsonProperty(FIELD_NAME_CHECKPOINT_ID) @Nullable final Long checkpointId, @JsonProperty(FIELD_NAME_FAILURE_CAUSE) @JsonDeserialize(using = SerializedThrowableDeserializer.class) @Nullable final SerializedThrowable failureCause) { checkArgument( - location != null ^ failureCause != null, - "Either location or failureCause must be set"); + checkpointId != null ^ failureCause != null, + "Either checkpointId or failureCause must be set"); - this.location = location; + this.checkpointId = checkpointId; this.failureCause = failureCause; } @Nullable - public String getLocation() { - return location; + @JsonIgnore + public Long getCheckpointId() { + return checkpointId; } @Nullable + @JsonIgnore public SerializedThrowable getFailureCause() { return failureCause; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java new file mode 100644 index 00000000000..ef6aa5994b2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusHeaders.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationStatusMessageHeaders; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** These headers define the protocol for querying the status of a checkpoint operation. */ +public class CheckpointStatusHeaders + extends AsynchronousOperationStatusMessageHeaders< + CheckpointInfo, CheckpointStatusMessageParameters> { + + private static final CheckpointStatusHeaders INSTANCE = new CheckpointStatusHeaders(); + + private static final String URL = + String.format( + "/jobs/:%s/checkpoints/:%s", + JobIDPathParameter.KEY, TriggerIdPathParameter.KEY); + + private CheckpointStatusHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public CheckpointStatusMessageParameters getUnresolvedMessageParameters() { + return new CheckpointStatusMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class<CheckpointInfo> getValueClass() { + return CheckpointInfo.class; + } + + @Override + public String getDescription() { + return "Returns the status of a checkpoint trigger operation."; + } + + public static CheckpointStatusHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java new file mode 100644 index 00000000000..d0b0d8b7816 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatusMessageParameters.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** The parameters for triggering a checkpoint. */ +public class CheckpointStatusMessageParameters extends MessageParameters { + + public final JobIDPathParameter jobIdPathParameter = new JobIDPathParameter(); + + public final TriggerIdPathParameter triggerIdPathParameter = new TriggerIdPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.unmodifiableCollection( + Arrays.asList(jobIdPathParameter, triggerIdPathParameter)); + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.emptyList(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java new file mode 100644 index 00000000000..6592e67f06d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerHeaders.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationTriggerMessageHeaders; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** These headers define the protocol for triggering a checkpoint. */ +public class CheckpointTriggerHeaders + extends AsynchronousOperationTriggerMessageHeaders< + CheckpointTriggerRequestBody, CheckpointTriggerMessageParameters> { + + private static final CheckpointTriggerHeaders INSTANCE = new CheckpointTriggerHeaders(); + + private static final String URL = + String.format("/jobs/:%s/checkpoints", JobIDPathParameter.KEY); + + private CheckpointTriggerHeaders() {} + + @Override + public Class<CheckpointTriggerRequestBody> getRequestClass() { + return CheckpointTriggerRequestBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.ACCEPTED; + } + + @Override + public CheckpointTriggerMessageParameters getUnresolvedMessageParameters() { + return new CheckpointTriggerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + protected String getAsyncOperationDescription() { + return "Triggers a checkpoint."; + } + + @Override + public String operationId() { + return "triggerCheckpoint"; + } + + public static CheckpointTriggerHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java new file mode 100644 index 00000000000..293eb59af4d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerMessageParameters.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** The parameters for triggering a checkpoint. */ +public class CheckpointTriggerMessageParameters extends MessageParameters { + + public final JobIDPathParameter jobID = new JobIDPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Collections.singleton(jobID); + } + + @Override + public Collection<MessageQueryParameter<?>> getQueryParameters() { + return Collections.emptyList(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java new file mode 100644 index 00000000000..1871ef56e3d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointTriggerRequestBody.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.TriggerId; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** {@link RequestBody} to trigger checkpoints. */ +public class CheckpointTriggerRequestBody implements RequestBody { + + private static final String FIELD_NAME_TRIGGER_ID = "triggerId"; + private static final String FIELD_NAME_CHECKPOINT_TYPE = "checkpointType"; + + @JsonProperty(FIELD_NAME_TRIGGER_ID) + @Nullable + private final TriggerId triggerId; + + @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) + private final CheckpointType checkpointType; + + @JsonCreator + public CheckpointTriggerRequestBody( + @Nullable @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) final CheckpointType checkpointType, + @Nullable @JsonProperty(FIELD_NAME_TRIGGER_ID) TriggerId triggerId) { + this.triggerId = triggerId; + this.checkpointType = checkpointType != null ? checkpointType : CheckpointType.DEFAULT; + } + + @JsonIgnore + public Optional<TriggerId> getTriggerId() { + return Optional.ofNullable(triggerId); + } + + @JsonIgnore + public CheckpointType getCheckpointType() { + return checkpointType; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java index 33cd1530e48..174405edbea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfo.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; @@ -67,11 +68,13 @@ public class SavepointInfo implements ResponseBody { } @Nullable + @JsonIgnore public String getLocation() { return location; } @Nullable + @JsonIgnore public SerializedThrowable getFailureCause() { return failureCause; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c822dc92c81..02c19f92391 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -878,7 +879,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling } @Override - public CompletableFuture<String> triggerCheckpoint() { + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) { mainThreadExecutor.assertRunningInMainThread(); final CheckpointCoordinator checkpointCoordinator = @@ -890,8 +891,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling log.info("Triggering a manual checkpoint for job {}.", jobID); return checkpointCoordinator - .triggerCheckpoint(false) - .thenApply(CompletedCheckpoint::getExternalPointer) + .triggerCheckpoint(checkpointType) .handleAsync( (path, throwable) -> { if (throwable != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 68877e287cd..70b0c3b8807 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -21,10 +21,12 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; @@ -125,7 +127,7 @@ public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync { CompletableFuture<String> triggerSavepoint( @Nullable String targetDirectory, boolean cancelJob, SavepointFormatType formatType); - CompletableFuture<String> triggerCheckpoint(); + CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType); void acknowledgeCheckpoint( JobID jobID, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index d76939bbb57..b8417a0f426 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointScheduling; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; @@ -623,10 +625,11 @@ public class AdaptiveScheduler } @Override - public CompletableFuture<String> triggerCheckpoint() { + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) { return state.tryCall( StateWithExecutionGraph.class, - StateWithExecutionGraph::triggerCheckpoint, + stateWithExecutionGraph -> + stateWithExecutionGraph.triggerCheckpoint(checkpointType), "triggerCheckpoint") .orElse( FutureUtils.completedExceptionally( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java index 8ba39311714..953f36a2573 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler.adaptive; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; @@ -275,7 +276,7 @@ abstract class StateWithExecutionGraph implements State { context.getMainThreadExecutor()); } - CompletableFuture<String> triggerCheckpoint() { + CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) { final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); final JobID jobID = executionGraph.getJobID(); @@ -286,8 +287,7 @@ abstract class StateWithExecutionGraph implements State { logger.info("Triggering a checkpoint for job {}.", jobID); return checkpointCoordinator - .triggerCheckpoint(false) - .thenApply(CompletedCheckpoint::getExternalPointer) + .triggerCheckpoint(checkpointType) .handleAsync( (path, throwable) -> { if (throwable != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 44e189da515..4f9085cda2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; @@ -143,6 +145,34 @@ public interface RestfulGateway extends RpcGateway { */ CompletableFuture<ThreadDumpInfo> requestThreadDump(@RpcTimeout Time timeout); + /** + * Triggers a checkpoint with the given savepoint directory as a target. + * + * @param operationKey the key of the operation, for deduplication purposes + * @param checkpointType checkpoint backup type (configured / full / incremental) + * @param timeout Timeout for the asynchronous operation + * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of + * the savepoint. + */ + default CompletableFuture<Acknowledge> triggerCheckpoint( + AsynchronousJobOperationKey operationKey, + CheckpointType checkpointType, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } + + /** + * Get the status of a checkpoint triggered under the specified operation key. + * + * @param operationKey key of the operation + * @return Future which completes immediately with the status, or fails if no operation is + * registered for the key + */ + default CompletableFuture<OperationResult<Long>> getTriggeredCheckpointStatus( + AsynchronousJobOperationKey operationKey) { + throw new UnsupportedOperationException(); + } + /** * Triggers a savepoint with the given savepoint directory as a target, returning a future that * completes when the operation is started. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 6a0c4320a3b..259495a3f33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -67,6 +67,7 @@ import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsH import org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointHandlers; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; @@ -545,6 +546,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp new SavepointHandlers.SavepointStatusHandler( leaderRetriever, timeout, responseHeaders); + final CheckpointHandlers.CheckpointTriggerHandler checkpointTriggerHandler = + new CheckpointHandlers.CheckpointTriggerHandler( + leaderRetriever, timeout, responseHeaders); + + final CheckpointHandlers.CheckpointStatusHandler checkpointStatusHandler = + new CheckpointHandlers.CheckpointStatusHandler( + leaderRetriever, timeout, responseHeaders); + final SubtaskExecutionAttemptDetailsHandler subtaskExecutionAttemptDetailsHandler = new SubtaskExecutionAttemptDetailsHandler( leaderRetriever, @@ -776,6 +785,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add( Tuple2.of(stopWithSavepointHandler.getMessageHeaders(), stopWithSavepointHandler)); handlers.add(Tuple2.of(savepointStatusHandler.getMessageHeaders(), savepointStatusHandler)); + handlers.add( + Tuple2.of(checkpointTriggerHandler.getMessageHeaders(), checkpointTriggerHandler)); + handlers.add( + Tuple2.of(checkpointStatusHandler.getMessageHeaders(), checkpointStatusHandler)); handlers.add( Tuple2.of( subtaskExecutionAttemptDetailsHandler.getMessageHeaders(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java index 66347194648..36ae8f3defe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java @@ -43,6 +43,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -287,6 +288,171 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { is(CheckpointType.FULL_CHECKPOINT)); } + @Test + public void testTriggeringCheckpointsWithNullCheckpointType() throws Exception { + CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = + new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway(); + + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .setTaskManagerGateway(gateway) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final StandaloneCompletedCheckpointStore checkpointStore = + new StandaloneCompletedCheckpointStore(1); + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + + CheckpointCoordinator checkpointCoordinator = + createCheckpointCoordinator(graph, checkpointStore, checkpointIDCounter); + + checkpointCoordinator.startCheckpointScheduler(); + gateway.resetCount(); + + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> checkpointCoordinator.triggerCheckpoint(null)); + } + + @Test + public void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception { + CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = + new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway(); + + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .setTaskManagerGateway(gateway) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0]; + ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId(); + + final StandaloneCompletedCheckpointStore checkpointStore = + new StandaloneCompletedCheckpointStore(1); + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + CheckpointCoordinator checkpointCoordinator = + createCheckpointCoordinator(graph, checkpointStore, checkpointIDCounter); + + checkpointCoordinator.startCheckpointScheduler(); + gateway.resetCount(); + + // trigger an incremental type checkpoint + final CompletableFuture<CompletedCheckpoint> checkpoint = + checkpointCoordinator.triggerCheckpoint( + org.apache.flink.core.execution.CheckpointType.INCREMENTAL); + + manuallyTriggeredScheduledExecutor.triggerAll(); + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1), + TASK_MANAGER_LOCATION_INFO); + checkpoint.get(); + Assertions.assertThat( + gateway.getOnlyTriggeredCheckpoint(attemptID) + .checkpointOptions + .getCheckpointType()) + .isEqualTo(CheckpointType.CHECKPOINT); + } + + @Test + public void testTriggeringCheckpointsWithFullCheckpointType() throws Exception { + CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = + new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway(); + + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .setTaskManagerGateway(gateway) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0]; + ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId(); + + final StandaloneCompletedCheckpointStore checkpointStore = + new StandaloneCompletedCheckpointStore(1); + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + CheckpointCoordinator checkpointCoordinator = + createCheckpointCoordinator(graph, checkpointStore, checkpointIDCounter); + + checkpointCoordinator.startCheckpointScheduler(); + gateway.resetCount(); + + // trigger an full type checkpoint + final CompletableFuture<CompletedCheckpoint> checkpoint = + checkpointCoordinator.triggerCheckpoint( + org.apache.flink.core.execution.CheckpointType.FULL); + + manuallyTriggeredScheduledExecutor.triggerAll(); + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1), + TASK_MANAGER_LOCATION_INFO); + checkpoint.get(); + Assertions.assertThat( + gateway.getOnlyTriggeredCheckpoint(attemptID) + .checkpointOptions + .getCheckpointType()) + .isEqualTo(CheckpointType.FULL_CHECKPOINT); + } + + @Test + public void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() + throws Exception { + CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = + new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway(); + + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .setTaskManagerGateway(gateway) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ExecutionVertex vertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0]; + ExecutionAttemptID attemptID = vertex.getCurrentExecutionAttempt().getAttemptId(); + + // create a savepoint, we can restore from later + final CompletedCheckpoint savepoint = takeSavepoint(graph, attemptID); + + // restore from a savepoint in NO_CLAIM mode + final StandaloneCompletedCheckpointStore checkpointStore = + new StandaloneCompletedCheckpointStore(1); + final StandaloneCheckpointIDCounter checkpointIDCounter = + new StandaloneCheckpointIDCounter(); + CheckpointCoordinator checkpointCoordinator = + createCheckpointCoordinator(graph, checkpointStore, checkpointIDCounter); + checkpointCoordinator.restoreSavepoint( + SavepointRestoreSettings.forPath( + savepoint.getExternalPointer(), true, RestoreMode.NO_CLAIM), + graph.getAllVertices(), + this.getClass().getClassLoader()); + + // trigger a savepoint before any checkpoint completes + // next triggered checkpoint should still be a full one + takeSavepoint(graph, attemptID, checkpointCoordinator, 2); + checkpointCoordinator.startCheckpointScheduler(); + gateway.resetCount(); + // the checkpoint should be a FULL_CHECKPOINT even it is specified as incremental + final CompletableFuture<CompletedCheckpoint> checkpoint = + checkpointCoordinator.triggerCheckpoint( + org.apache.flink.core.execution.CheckpointType.INCREMENTAL); + manuallyTriggeredScheduledExecutor.triggerAll(); + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 3), + TASK_MANAGER_LOCATION_INFO); + checkpoint.get(); + + Assertions.assertThat( + gateway.getOnlyTriggeredCheckpoint(attemptID) + .checkpointOptions + .getCheckpointType()) + .isEqualTo(CheckpointType.FULL_CHECKPOINT); + } + private CompletedCheckpoint takeSavepoint(ExecutionGraph graph, ExecutionAttemptID attemptID) throws Exception { CheckpointCoordinator checkpointCoordinator = @@ -324,7 +490,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger { CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfigurationBuilder() .setCheckpointInterval( - 10000) // periodic is ver long, we trigger checkpoint manually + 10000) // periodic is very long, we trigger checkpoint manually .setCheckpointTimeout(200000) // timeout is very long (200 s) .setMaxConcurrentCheckpoints(Integer.MAX_VALUE) .build(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java index 4cc522b7fe2..1272f41e3c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java @@ -20,8 +20,10 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.messages.Acknowledge; @@ -51,12 +53,15 @@ public class DispatcherCachedOperationsHandlerTest extends TestLogger { private static final Time TIMEOUT = Time.minutes(10); - private CompletedOperationCache<AsynchronousJobOperationKey, String> cache; + private CompletedOperationCache<AsynchronousJobOperationKey, Long> checkpointTriggerCache; + private CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache; private DispatcherCachedOperationsHandler handler; + private TriggerCheckpointSpyFunction triggerCheckpointFunction; private TriggerSavepointSpyFunction triggerSavepointFunction; private TriggerSavepointSpyFunction stopWithSavepointFunction; + private CompletableFuture<Long> checkpointIdFuture = new CompletableFuture<>(); private CompletableFuture<String> savepointLocationFuture = new CompletableFuture<>(); private final JobID jobID = new JobID(); private final String targetDirectory = "dummyDirectory"; @@ -64,6 +69,18 @@ public class DispatcherCachedOperationsHandlerTest extends TestLogger { @BeforeEach public void setup() { + + checkpointIdFuture = new CompletableFuture<>(); + triggerCheckpointFunction = + TriggerCheckpointSpyFunction.wrap( + new TriggerCheckpointSpyFunction() { + @Override + CompletableFuture<Long> applyWrappedFunction( + JobID jobID, CheckpointType checkpointType, Time timeout) { + return checkpointIdFuture; + } + }); + savepointLocationFuture = new CompletableFuture<>(); triggerSavepointFunction = TriggerSavepointSpyFunction.wrap( @@ -73,12 +90,21 @@ public class DispatcherCachedOperationsHandlerTest extends TestLogger { TriggerSavepointSpyFunction.wrap( (jobID, targetDirectory, formatType, savepointMode, timeout) -> savepointLocationFuture); - cache = + + checkpointTriggerCache = + new CompletedOperationCache<>( + RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue()); + + savepointTriggerCache = new CompletedOperationCache<>( RestOptions.ASYNC_OPERATION_STORE_DURATION.defaultValue()); handler = new DispatcherCachedOperationsHandler( - triggerSavepointFunction, stopWithSavepointFunction, cache); + triggerCheckpointFunction, + checkpointTriggerCache, + triggerSavepointFunction, + stopWithSavepointFunction, + savepointTriggerCache); operationKey = AsynchronousJobOperationKey.of(new TriggerId(), jobID); } @@ -165,12 +191,14 @@ public class DispatcherCachedOperationsHandlerTest extends TestLogger { .get(); // should not complete because we wait for the result to be accessed - assertThat(cache.closeAsync(), FlinkMatchers.willNotComplete(Duration.ofMillis(10))); + assertThat( + savepointTriggerCache.closeAsync(), + FlinkMatchers.willNotComplete(Duration.ofMillis(10))); } @Test public void throwsIfCacheIsShuttingDown() { - cache.closeAsync(); + savepointTriggerCache.closeAsync(); assertThrows( IllegalStateException.class, () -> @@ -208,6 +236,41 @@ public class DispatcherCachedOperationsHandlerTest extends TestLogger { assertThat(statusFuture, futureFailedWith(UnknownOperationKeyException.class)); } + private abstract static class TriggerCheckpointSpyFunction + implements TriggerCheckpointFunction { + + private final List<Tuple2<JobID, CheckpointType>> invocations = new ArrayList<>(); + + @Override + public CompletableFuture<Long> apply( + JobID jobID, CheckpointType checkpointType, Time timeout) { + invocations.add(new Tuple2<>(jobID, checkpointType)); + return applyWrappedFunction(jobID, checkpointType, timeout); + } + + abstract CompletableFuture<Long> applyWrappedFunction( + JobID jobID, CheckpointType checkpointType, Time timeout); + + public List<Tuple2<JobID, CheckpointType>> getInvocationParameters() { + return invocations; + } + + public int getNumberOfInvocations() { + return invocations.size(); + } + + public static TriggerCheckpointSpyFunction wrap( + TriggerCheckpointSpyFunction wrappedFunction) { + return new TriggerCheckpointSpyFunction() { + @Override + CompletableFuture<Long> applyWrappedFunction( + JobID jobID, CheckpointType checkpointType, Time timeout) { + return wrappedFunction.apply(jobID, checkpointType, timeout); + } + }; + } + } + private abstract static class TriggerSavepointSpyFunction implements TriggerSavepointFunction { private final List<Tuple4<JobID, String, SavepointFormatType, TriggerSavepointMode>> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index afe38084018..a58cb12c334 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -24,10 +24,12 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.blocklist.BlockedNode; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -133,7 +135,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { private final TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction; - @Nonnull private final Supplier<CompletableFuture<String>> triggerCheckpointFunction; + @Nonnull + private final Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction; @Nonnull private final TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> @@ -236,7 +240,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction, - @Nonnull Supplier<CompletableFuture<String>> triggerCheckpointFunction, + @Nonnull + Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction, @Nonnull TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> stopWithSavepointFunction, @@ -412,8 +418,9 @@ public class TestingJobMasterGateway implements JobMasterGateway { } @Override - public CompletableFuture<String> triggerCheckpoint(Time timeout) { - return triggerCheckpointFunction.get(); + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint( + CheckpointType checkpointType, Time timeout) { + return triggerCheckpointFunction.apply(checkpointType); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index 4560cc38cd5..31b547061bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -22,10 +22,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.blocklist.BlockedNode; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -125,8 +127,8 @@ public class TestingJobMasterGatewayBuilder { targetDirectory != null ? targetDirectory : UUID.randomUUID().toString()); - private Supplier<CompletableFuture<String>> triggerCheckpointFunction = - () -> CompletableFuture.completedFuture(UUID.randomUUID().toString()); + private Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction = (prop) -> new CompletableFuture<>(); private TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> stopWithSavepointFunction = (targetDirectory, ignoredB, formatType) -> @@ -285,7 +287,8 @@ public class TestingJobMasterGatewayBuilder { } public TestingJobMasterGatewayBuilder setTriggerCheckpointFunction( - Supplier<CompletableFuture<String>> triggerCheckpointFunction) { + Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction) { this.triggerCheckpointFunction = triggerCheckpointFunction; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java new file mode 100644 index 00000000000..8152a1ba08b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.RestMatchers; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.OperationResult; +import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CheckpointHandlers}. */ +class CheckpointHandlersTest extends TestLogger { + + private static final Time TIMEOUT = Time.seconds(10); + + private static final JobID JOB_ID = new JobID(); + + private static final Long COMPLETED_CHECKPOINT_ID = 123456L; + + private static CheckpointHandlers.CheckpointTriggerHandler checkpointTriggerHandler; + + private static CheckpointHandlers.CheckpointStatusHandler checkpointStatusHandler; + + @BeforeAll + static void setUp() throws Exception { + GatewayRetriever<RestfulGateway> leaderRetriever = + () -> CompletableFuture.completedFuture(null); + + checkpointTriggerHandler = + new CheckpointHandlers.CheckpointTriggerHandler( + leaderRetriever, TIMEOUT, Collections.emptyMap()); + + checkpointStatusHandler = + new CheckpointHandlers.CheckpointStatusHandler( + leaderRetriever, TIMEOUT, Collections.emptyMap()); + } + + @Test + void testCheckpointTriggerCompletedSuccessfully() throws Exception { + final OperationResult<Long> successfulResult = + OperationResult.success(COMPLETED_CHECKPOINT_ID); + final CompletableFuture<CheckpointType> checkpointPropertiesFuture = + new CompletableFuture<>(); + + final AtomicReference<AsynchronousJobOperationKey> keyReference = new AtomicReference<>(); + final TestingRestfulGateway testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setTriggerCheckpointFunction( + (AsynchronousJobOperationKey key, + CheckpointType checkpointType) -> { + keyReference.set(key); + checkpointPropertiesFuture.complete(checkpointType); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setGetCheckpointStatusFunction( + (AsynchronousJobOperationKey operationKey) -> { + if (operationKey.equals(keyReference.get())) { + return CompletableFuture.completedFuture(successfulResult); + } + throw new RuntimeException( + "Expected operation key " + + keyReference.get() + + ", but received " + + operationKey); + }) + .build(); + + final CheckpointType checkpointType = CheckpointType.FULL; + + final TriggerId triggerId = + checkpointTriggerHandler + .handleRequest( + triggerCheckpointRequest(checkpointType, null), + testingRestfulGateway) + .get() + .getTriggerId(); + + final AsynchronousOperationResult<CheckpointInfo> checkpointTriggerResponseBody = + checkpointStatusHandler + .handleRequest( + checkpointTriggerStatusRequest(triggerId), testingRestfulGateway) + .get(); + + assertThat(checkpointTriggerResponseBody.queueStatus().getId()) + .isEqualTo(QueueStatus.Id.COMPLETED); + assertThat(checkpointTriggerResponseBody.resource()).isNotNull(); + assertThat(checkpointTriggerResponseBody.resource().getCheckpointId()) + .isEqualTo(COMPLETED_CHECKPOINT_ID); + assertThat(checkpointPropertiesFuture.get()).isEqualTo(CheckpointType.FULL); + } + + @Test + void testTriggerCheckpointNoCheckpointType() throws Exception { + final OperationResult<Long> successfulResult = + OperationResult.success(COMPLETED_CHECKPOINT_ID); + final CompletableFuture<CheckpointType> checkpointTypeFuture = new CompletableFuture<>(); + + final AtomicReference<AsynchronousJobOperationKey> keyReference = new AtomicReference<>(); + final TestingRestfulGateway testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setTriggerCheckpointFunction( + (AsynchronousJobOperationKey key, + CheckpointType checkpointType) -> { + keyReference.set(key); + checkpointTypeFuture.complete(checkpointType); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setGetCheckpointStatusFunction( + (AsynchronousJobOperationKey operationKey) -> { + if (operationKey.equals(keyReference.get())) { + return CompletableFuture.completedFuture(successfulResult); + } + throw new RuntimeException( + "Expected operation key " + + keyReference.get() + + ", but received " + + operationKey); + }) + .build(); + + final TriggerId triggerId = + checkpointTriggerHandler + .handleRequest(triggerCheckpointRequest(null, null), testingRestfulGateway) + .get() + .getTriggerId(); + + AsynchronousOperationResult<CheckpointInfo> checkpointTriggerResponseBody; + checkpointTriggerResponseBody = + checkpointStatusHandler + .handleRequest( + checkpointTriggerStatusRequest(triggerId), testingRestfulGateway) + .get(); + + assertThat(checkpointTriggerResponseBody.queueStatus().getId()) + .isEqualTo(QueueStatus.Id.COMPLETED); + assertThat(checkpointTriggerResponseBody.resource()).isNotNull(); + assertThat(checkpointTriggerResponseBody.resource().getCheckpointId()) + .isEqualTo(COMPLETED_CHECKPOINT_ID); + assertThat(checkpointTypeFuture.get()).isEqualTo(CheckpointType.DEFAULT); + } + + @Test + void testCheckpointCompletedWithException() throws Exception { + final OperationResult<Long> failedResult = + OperationResult.failure(new RuntimeException("expected")); + + final AtomicReference<AsynchronousJobOperationKey> keyReference = new AtomicReference<>(); + final TestingRestfulGateway testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setTriggerCheckpointFunction( + (AsynchronousJobOperationKey key, + CheckpointType checkpointType) -> { + keyReference.set(key); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setGetCheckpointStatusFunction( + (AsynchronousJobOperationKey operationKey) -> { + if (operationKey.equals(keyReference.get())) { + return CompletableFuture.completedFuture(failedResult); + } + throw new RuntimeException( + "Expected operation key " + + keyReference.get() + + ", but received " + + operationKey); + }) + .build(); + + final TriggerId triggerId = + checkpointTriggerHandler + .handleRequest(triggerCheckpointRequest(null, null), testingRestfulGateway) + .get() + .getTriggerId(); + + AsynchronousOperationResult<CheckpointInfo> checkpointTriggerResponseBody; + checkpointTriggerResponseBody = + checkpointStatusHandler + .handleRequest( + checkpointTriggerStatusRequest(triggerId), testingRestfulGateway) + .get(); + + assertThat(checkpointTriggerResponseBody.queueStatus().getId()) + .isEqualTo(QueueStatus.Id.COMPLETED); + assertThat(checkpointTriggerResponseBody.resource()).isNotNull(); + assertThat(checkpointTriggerResponseBody.resource().getFailureCause()).isNotNull(); + + final Throwable checkpointError = + checkpointTriggerResponseBody + .resource() + .getFailureCause() + .deserializeError(ClassLoader.getSystemClassLoader()); + assertThat(checkpointError.getMessage()).matches("expected"); + assertThat(checkpointError).isInstanceOf(RuntimeException.class); + } + + @Test + void testProvidedTriggerId() throws Exception { + final OperationResult<Long> successfulResult = + OperationResult.success(COMPLETED_CHECKPOINT_ID); + final AtomicReference<AsynchronousJobOperationKey> keyReference = new AtomicReference<>(); + final TestingRestfulGateway testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setTriggerCheckpointFunction( + (AsynchronousJobOperationKey key, + CheckpointType checkpointType) -> { + keyReference.set(key); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setGetCheckpointStatusFunction( + (AsynchronousJobOperationKey operationKey) -> { + if (operationKey.equals(keyReference.get())) { + return CompletableFuture.completedFuture(successfulResult); + } + throw new RuntimeException( + "Expected operation key " + + keyReference.get() + + ", but received " + + operationKey); + }) + .build(); + + final TriggerId providedTriggerId = new TriggerId(); + + final TriggerId returnedTriggerId = + checkpointTriggerHandler + .handleRequest( + triggerCheckpointRequest(CheckpointType.FULL, providedTriggerId), + testingRestfulGateway) + .get() + .getTriggerId(); + + assertThat(providedTriggerId).isEqualTo(returnedTriggerId); + + AsynchronousOperationResult<CheckpointInfo> checkpointTriggerResponseBody; + checkpointTriggerResponseBody = + checkpointStatusHandler + .handleRequest( + checkpointTriggerStatusRequest(providedTriggerId), + testingRestfulGateway) + .get(); + + assertThat(checkpointTriggerResponseBody.queueStatus().getId()) + .isEqualTo(QueueStatus.Id.COMPLETED); + assertThat(checkpointTriggerResponseBody.resource()).isNotNull(); + assertThat(checkpointTriggerResponseBody.resource().getCheckpointId()) + .isEqualTo(COMPLETED_CHECKPOINT_ID); + } + + @Test + void testQueryStatusOfUnknownOperationReturnsError() + throws HandlerRequestException, RestHandlerException { + + final TestingRestfulGateway testingRestfulGateway = + new TestingRestfulGateway.Builder() + .setGetCheckpointStatusFunction( + key -> + FutureUtils.completedExceptionally( + new UnknownOperationKeyException(key))) + .build(); + + final CompletableFuture<AsynchronousOperationResult<CheckpointInfo>> statusFuture = + checkpointStatusHandler.handleRequest( + checkpointTriggerStatusRequest(new TriggerId()), testingRestfulGateway); + + assertThat(statusFuture) + .matches(RestMatchers.respondsWithError(HttpResponseStatus.NOT_FOUND)::matches); + } + + private static HandlerRequest<CheckpointTriggerRequestBody> triggerCheckpointRequest( + final CheckpointType checkpointType, @Nullable final TriggerId triggerId) + throws HandlerRequestException { + return HandlerRequest.resolveParametersAndCreate( + new CheckpointTriggerRequestBody(checkpointType, triggerId), + new CheckpointTriggerMessageParameters(), + Collections.singletonMap(JobIDPathParameter.KEY, JOB_ID.toString()), + Collections.emptyMap(), + Collections.emptyList()); + } + + private static HandlerRequest<EmptyRequestBody> checkpointTriggerStatusRequest( + final TriggerId triggerId) throws HandlerRequestException { + final Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(JobIDPathParameter.KEY, JOB_ID.toString()); + pathParameters.put(TriggerIdPathParameter.KEY, triggerId.toString()); + + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new CheckpointStatusMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index 43386a2e705..c54511b7b3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -20,10 +20,12 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -50,6 +52,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; /** Testing implementation of the {@link SchedulerNG}. */ @@ -59,7 +62,8 @@ public class TestingSchedulerNG implements SchedulerNG { private final Supplier<CompletableFuture<Void>> closeAsyncSupplier; private final TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction; - private final Supplier<CompletableFuture<String>> triggerCheckpointFunction; + private final Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction; private final Consumer<Throwable> handleGlobalFailureConsumer; private TestingSchedulerNG( @@ -68,7 +72,8 @@ public class TestingSchedulerNG implements SchedulerNG { Supplier<CompletableFuture<Void>> closeAsyncSupplier, TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction, - Supplier<CompletableFuture<String>> triggerCheckpointFunction, + Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction, Consumer<Throwable> handleGlobalFailureConsumer) { this.jobTerminationFuture = jobTerminationFuture; this.startSchedulingRunnable = startSchedulingRunnable; @@ -181,8 +186,8 @@ public class TestingSchedulerNG implements SchedulerNG { } @Override - public CompletableFuture<String> triggerCheckpoint() { - return triggerCheckpointFunction.get(); + public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) { + return triggerCheckpointFunction.apply(checkpointType); } @Override @@ -240,8 +245,8 @@ public class TestingSchedulerNG implements SchedulerNG { private TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction = (ignoredA, ignoredB, formatType) -> new CompletableFuture<>(); - private Supplier<CompletableFuture<String>> triggerCheckpointFunction = - CompletableFuture::new; + private Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction = (ignored) -> new CompletableFuture<>(); private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> {}; public Builder setJobTerminationFuture(CompletableFuture<JobStatus> jobTerminationFuture) { @@ -267,7 +272,8 @@ public class TestingSchedulerNG implements SchedulerNG { } public Builder setTriggerCheckpointFunction( - Supplier<CompletableFuture<String>> triggerCheckpointFunction) { + Function<CheckpointType, CompletableFuture<CompletedCheckpoint>> + triggerCheckpointFunction) { this.triggerCheckpointFunction = triggerCheckpointFunction; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java index 343002648af..7136620a907 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -49,6 +50,7 @@ import org.apache.flink.util.function.TriFunction; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -127,6 +129,10 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServiceGatewaysSupplier, Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier, + BiFunction<AsynchronousJobOperationKey, CheckpointType, CompletableFuture<Acknowledge>> + triggerCheckpointFunction, + Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + getCheckpointStatusFunction, TriFunction< AsynchronousJobOperationKey, String, @@ -174,6 +180,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway requestMetricQueryServiceAddressesSupplier, requestTaskManagerMetricQueryServiceGatewaysSupplier, requestThreadDumpSupplier, + triggerCheckpointFunction, + getCheckpointStatusFunction, triggerSavepointFunction, stopWithSavepointFunction, getSavepointStatusFunction, @@ -354,6 +362,8 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway requestMetricQueryServiceGatewaysSupplier, requestTaskManagerMetricQueryServiceGatewaysSupplier, requestThreadDumpSupplier, + triggerCheckpointFunction, + getCheckpointStatusFunction, triggerSavepointFunction, triggerSavepointAndGetLocationFunction, stopWithSavepointFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index ffd53db2b9b..997a1fe6865 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; @@ -45,6 +46,7 @@ import org.apache.flink.util.function.TriFunction; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -85,6 +87,16 @@ public class TestingRestfulGateway implements RestfulGateway { () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); static final Supplier<CompletableFuture<Acknowledge>> DEFAULT_CLUSTER_SHUTDOWN_SUPPLIER = () -> CompletableFuture.completedFuture(Acknowledge.get()); + static final BiFunction< + AsynchronousJobOperationKey, CheckpointType, CompletableFuture<Acknowledge>> + DEFAULT_TRIGGER_CHECKPOINT_FUNCTION = + (AsynchronousJobOperationKey operationKey, CheckpointType checkpointType) -> + FutureUtils.completedExceptionally(new UnsupportedOperationException()); + static final Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION = + (AsynchronousJobOperationKey operationKey) -> + FutureUtils.completedExceptionally(new UnsupportedOperationException()); + static final TriFunction< AsynchronousJobOperationKey, String, @@ -152,6 +164,13 @@ public class TestingRestfulGateway implements RestfulGateway { protected Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier; + protected BiFunction< + AsynchronousJobOperationKey, CheckpointType, CompletableFuture<Acknowledge>> + triggerCheckpointFunction; + + protected Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + getCheckpointStatusFunction; + protected TriFunction< AsynchronousJobOperationKey, String, @@ -190,6 +209,8 @@ public class TestingRestfulGateway implements RestfulGateway { DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER, DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER, DEFAULT_REQUEST_THREAD_DUMP_SUPPLIER, + DEFAULT_TRIGGER_CHECKPOINT_FUNCTION, + DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION, DEFAULT_TRIGGER_SAVEPOINT_FUNCTION, DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION, DEFAULT_GET_SAVEPOINT_STATUS_FUNCTION, @@ -213,6 +234,10 @@ public class TestingRestfulGateway implements RestfulGateway { Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServiceAddressesSupplier, Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier, + BiFunction<AsynchronousJobOperationKey, CheckpointType, CompletableFuture<Acknowledge>> + triggerCheckpointFunction, + Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + getCheckpointStatusFunction, TriFunction< AsynchronousJobOperationKey, String, @@ -248,6 +273,8 @@ public class TestingRestfulGateway implements RestfulGateway { this.requestTaskManagerMetricQueryServiceAddressesSupplier = requestTaskManagerMetricQueryServiceAddressesSupplier; this.requestThreadDumpSupplier = requestThreadDumpSupplier; + this.triggerCheckpointFunction = triggerCheckpointFunction; + this.getCheckpointStatusFunction = getCheckpointStatusFunction; this.triggerSavepointFunction = triggerSavepointFunction; this.stopWithSavepointFunction = stopWithSavepointFunction; this.getSavepointStatusFunction = getSavepointStatusFunction; @@ -313,6 +340,18 @@ public class TestingRestfulGateway implements RestfulGateway { return null; } + @Override + public CompletableFuture<Acknowledge> triggerCheckpoint( + AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Time timeout) { + return triggerCheckpointFunction.apply(operationKey, checkpointType); + } + + @Override + public CompletableFuture<OperationResult<Long>> getTriggeredCheckpointStatus( + AsynchronousJobOperationKey operationKey) { + return getCheckpointStatusFunction.apply(operationKey); + } + @Override public CompletableFuture<Acknowledge> triggerSavepoint( AsynchronousJobOperationKey operationKey, @@ -383,6 +422,11 @@ public class TestingRestfulGateway implements RestfulGateway { requestTaskManagerMetricQueryServiceGatewaysSupplier; protected Supplier<CompletableFuture<ThreadDumpInfo>> requestThreadDumpSupplier; protected Supplier<CompletableFuture<Acknowledge>> clusterShutdownSupplier; + protected BiFunction< + AsynchronousJobOperationKey, CheckpointType, CompletableFuture<Acknowledge>> + triggerCheckpointFunction; + protected Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + getCheckpointStatusFunction; protected TriFunction< AsynchronousJobOperationKey, String, @@ -416,6 +460,8 @@ public class TestingRestfulGateway implements RestfulGateway { DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER; requestTaskManagerMetricQueryServiceGatewaysSupplier = DEFAULT_REQUEST_TASK_MANAGER_METRIC_QUERY_SERVICE_PATHS_SUPPLIER; + triggerCheckpointFunction = DEFAULT_TRIGGER_CHECKPOINT_FUNCTION; + getCheckpointStatusFunction = DEFAULT_GET_CHECKPOINT_STATUS_FUNCTION; triggerSavepointFunction = DEFAULT_TRIGGER_SAVEPOINT_FUNCTION; stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION; getSavepointStatusFunction = DEFAULT_GET_SAVEPOINT_STATUS_FUNCTION; @@ -500,6 +546,23 @@ public class TestingRestfulGateway implements RestfulGateway { return self(); } + public T setTriggerCheckpointFunction( + BiFunction< + AsynchronousJobOperationKey, + CheckpointType, + CompletableFuture<Acknowledge>> + triggerCheckpointFunction) { + this.triggerCheckpointFunction = triggerCheckpointFunction; + return self(); + } + + public T setGetCheckpointStatusFunction( + Function<AsynchronousJobOperationKey, CompletableFuture<OperationResult<Long>>> + getCheckpointStatusFunction) { + this.getCheckpointStatusFunction = getCheckpointStatusFunction; + return self(); + } + public T setTriggerSavepointFunction( TriFunction< AsynchronousJobOperationKey, @@ -569,6 +632,8 @@ public class TestingRestfulGateway implements RestfulGateway { requestMetricQueryServiceGatewaysSupplier, requestTaskManagerMetricQueryServiceGatewaysSupplier, requestThreadDumpSupplier, + triggerCheckpointFunction, + getCheckpointStatusFunction, triggerSavepointFunction, stopWithSavepointFunction, getSavepointStatusFunction,