[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324003#comment-16324003 ]
ASF GitHub Bot commented on FLINK-8317: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161224329 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * <p>Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * <p>A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint, e.g., + * <pre> + * { "target-directory": "/tmp" } + * </pre> + * If the body is omitted, or the field {@code target-property} is {@code null}, the default + * savepoint directory as specified by {@link CoreOptions#SAVEPOINT_DIRECTORY} will be used. + * As written above, the response will contain a request id, e.g., + * <pre> + * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" } + * </pre> + * + * <p>To poll for the status of an ongoing savepoint, an HTTP {@code GET} request is issued to + * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified savepoint is still ongoing, + * the response will be + * <pre> + * { + * "status": { + * "id": "IN_PROGRESS" + * } + * } + * </pre> + * If the specified savepoint has completed, the status id will transition to {@code COMPLETED}, and + * the response will additionally contain information about the savepoint, such as the location: + * <pre> + * { + * "status": { + * "id": "COMPLETED" + * }, + * "savepoint": { + * "request-id": "7d273f5a62eb4730b9dea8e833733c1e", + * "location": "/tmp/savepoint-d9813b-8a68e674325b" + * } + * } + * </pre> + */ +public class SavepointHandlers { + + private final CompletedCheckpointCache completedCheckpointCache = new CompletedCheckpointCache(); + + @Nullable + private String defaultSavepointDir; + + public SavepointHandlers(@Nullable final String defaultSavepointDir) { + this.defaultSavepointDir = defaultSavepointDir; + } + + /** + * HTTP handler to trigger savepoints. + */ + public class SavepointTriggerHandler + extends AbstractRestHandler<RestfulGateway, SavepointTriggerRequestBody, SavepointTriggerResponseBody, SavepointTriggerMessageParameters> { + + public SavepointTriggerHandler( + final CompletableFuture<String> localRestAddress, + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout, + final Map<String, String> responseHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, SavepointTriggerHeaders.getInstance()); + } + + @Override + protected CompletableFuture<SavepointTriggerResponseBody> handleRequest( + @Nonnull final HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final String requestedTargetDirectory = request.getRequestBody().getTargetDirectory(); + + if (requestedTargetDirectory == null && defaultSavepointDir == null) { + return FutureUtils.completedExceptionally( + new RestHandlerException( + String.format("Config key [%s] is not set. Property [%s] must be provided.", + CoreOptions.SAVEPOINT_DIRECTORY.key(), + SavepointTriggerRequestBody.FIELD_NAME_TARGET_DIRECTORY), + HttpResponseStatus.BAD_REQUEST)); + } + + final String targetDirectory = requestedTargetDirectory != null ? requestedTargetDirectory : defaultSavepointDir; + final CompletableFuture<CompletedCheckpoint> completedCheckpointCompletableFuture = + gateway.triggerSavepoint(jobId, targetDirectory, RpcUtils.INF_TIMEOUT); + final SavepointTriggerId savepointTriggerId = new SavepointTriggerId(); + completedCheckpointCache.registerOngoingCheckpoint( + SavepointKey.of(savepointTriggerId, jobId), + completedCheckpointCompletableFuture); + return CompletableFuture.completedFuture( + new SavepointTriggerResponseBody(savepointTriggerId)); + } + } + + /** + * HTTP handler to query for the status of the savepoint. + */ + public class SavepointStatusHandler + extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> { + + public SavepointStatusHandler( + final CompletableFuture<String> localRestAddress, + final GatewayRetriever<? extends RestfulGateway> leaderRetriever, + final Time timeout, + final Map<String, String> responseHeaders) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, SavepointStatusHeaders.getInstance()); + } + + @Override + protected CompletableFuture<SavepointResponseBody> handleRequest( + @Nonnull final HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final SavepointTriggerId savepointTriggerId = request.getPathParameter( + SavepointTriggerIdPathParameter.class); + final Either<Throwable, CompletedCheckpoint> completedCheckpointOrError; + try { + completedCheckpointOrError = completedCheckpointCache.get(SavepointKey.of( + savepointTriggerId, jobId)); + } catch (UnknownSavepointTriggerId e) { + return FutureUtils.completedExceptionally( + new NotFoundException("Savepoint not found. Savepoint trigger id: " + + savepointTriggerId + ", job id: " + jobId)); + } + + if (completedCheckpointOrError != null) { + if (completedCheckpointOrError.isLeft()) { + return CompletableFuture.completedFuture(new SavepointResponseBody( + QueueStatus.completed(), + new SavepointInfo(savepointTriggerId, null, new SerializedThrowable( + completedCheckpointOrError.left())))); + } else { + final CompletedCheckpoint completedCheckpoint = completedCheckpointOrError.right(); + final String externalPointer = completedCheckpoint.getExternalPointer(); + return CompletableFuture.completedFuture(new SavepointResponseBody( + QueueStatus.completed(), + new SavepointInfo(savepointTriggerId, externalPointer, null))); + } + } else { + return CompletableFuture.completedFuture(SavepointResponseBody.inProgress()); + } + } + } + + /** + * Cache to manage ongoing checkpoints. + * + * <p>The cache allows to register an ongoing checkpoint in the form of a + * {@code CompletableFuture<CompletedCheckpoint>}. Completed checkpoints will be removed from + * the cache automatically after a fixed timeout. + */ + @ThreadSafe + static class CompletedCheckpointCache { + + private static final long COMPLETED_CHECKPOINTS_CACHE_DURATION_SECONDS = 300; + + /** + * Stores SavepointKeys of ongoing checkpoints. + * If the checkpoint completes, it will be moved to {@link #completedCheckpoints}. + */ + private final Set<SavepointKey> registeredSavepointTriggers = ConcurrentHashMap.newKeySet(); + + /** Caches completed checkpoints. */ + private final Cache<SavepointKey, Either<Throwable, CompletedCheckpoint>> completedCheckpoints = + CacheBuilder.newBuilder() + .expireAfterWrite(COMPLETED_CHECKPOINTS_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + /** + * Registers an ongoing checkpoint with the cache. + */ + void registerOngoingCheckpoint( + final SavepointKey savepointTriggerId, + final CompletableFuture<CompletedCheckpoint> checkpointFuture) { + registeredSavepointTriggers.add(savepointTriggerId); + checkpointFuture.whenComplete((completedCheckpoint, error) -> { + if (error == null) { + completedCheckpoints.put(savepointTriggerId, Either.Right(completedCheckpoint)); + } else { + completedCheckpoints.put(savepointTriggerId, Either.Left(error)); + } + registeredSavepointTriggers.remove(savepointTriggerId); + }); + } + + /** + * Returns the CompletedCheckpoint or a Throwable if the CompletableFuture finished, + * otherwise {@code null}. + * + * @throws UnknownSavepointTriggerId If the savepoint is not found, and there is no ongoing + * checkpoint under the provided key. + */ + @Nullable + Either<Throwable, CompletedCheckpoint> get( + final SavepointKey savepointTriggerId) throws UnknownSavepointTriggerId { + Either<Throwable, CompletedCheckpoint> completedCheckpointOrError = null; + if (!registeredSavepointTriggers.contains(savepointTriggerId) + && (completedCheckpointOrError = completedCheckpoints.getIfPresent(savepointTriggerId)) == null) { + throw new UnknownSavepointTriggerId(); + } + return completedCheckpointOrError; + } + } + + /** + * A pair of {@link JobID} and {@link SavepointTriggerId} used as a key to a hash based + * collection. + * + * @see CompletedCheckpointCache + */ + @Immutable + static class SavepointKey { + + private final SavepointTriggerId savepointTriggerId; + + private final JobID jobId; + + private SavepointKey(final SavepointTriggerId savepointTriggerId, final JobID jobId) { + this.savepointTriggerId = requireNonNull(savepointTriggerId); + this.jobId = requireNonNull(jobId); + } + + private static SavepointKey of(final SavepointTriggerId savepointTriggerId, final JobID jobId) { + return new SavepointKey(savepointTriggerId, jobId); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final SavepointKey that = (SavepointKey) o; + + if (!savepointTriggerId.equals(that.savepointTriggerId)) { + return false; + } + return jobId.equals(that.jobId); + } + + @Override + public int hashCode() { + int result = savepointTriggerId.hashCode(); + result = 31 * result + jobId.hashCode(); + return result; + } + } + + /** + * Exception that indicates that there is no ongoing or completed checkpoint for a given + * {@link JobID} and {@link SavepointTriggerId} pair. + */ + static class UnknownSavepointTriggerId extends Exception { --- End diff -- Could extend `FlinkException`. > Enable Triggering of Savepoints via RestfulGateway > -------------------------------------------------- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST > Affects Versions: 1.5.0 > Reporter: Gary Yao > Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture<CompletedCheckpoint> > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)