Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161224559 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * <p>Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * <p>A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint, e.g., + * <pre> + * { "target-directory": "/tmp" } + * </pre> + * If the body is omitted, or the field {@code target-property} is {@code null}, the default + * savepoint directory as specified by {@link CoreOptions#SAVEPOINT_DIRECTORY} will be used. + * As written above, the response will contain a request id, e.g., + * <pre> + * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" } + * </pre> + * + * <p>To poll for the status of an ongoing savepoint, an HTTP {@code GET} request is issued to + * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified savepoint is still ongoing, + * the response will be + * <pre> + * { + * "status": { + * "id": "IN_PROGRESS" + * } + * } + * </pre> + * If the specified savepoint has completed, the status id will transition to {@code COMPLETED}, and + * the response will additionally contain information about the savepoint, such as the location: + * <pre> + * { + * "status": { + * "id": "COMPLETED" + * }, + * "savepoint": { + * "request-id": "7d273f5a62eb4730b9dea8e833733c1e", + * "location": "/tmp/savepoint-d9813b-8a68e674325b" + * } + * } + * </pre> + */ +public class SavepointHandlers { + + private final CompletedCheckpointCache completedCheckpointCache = new CompletedCheckpointCache(); + + @Nullable + private String defaultSavepointDir; --- End diff -- Could we make this final?
---