Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5223#discussion_r161236268
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
    @@ -0,0 +1,337 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.savepoints;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.NotFoundException;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
    +import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.types.Either;
    +import org.apache.flink.util.SerializedThrowable;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.Immutable;
    +import javax.annotation.concurrent.ThreadSafe;
    +
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * HTTP handlers for asynchronous triggering of savepoints.
    + *
    + * <p>Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
    + * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
    + * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
    + * which is returned in the response body. Next, the returned id should be 
used to poll the status
    + * of the savepoint until it is finished.
    + *
    + * <p>A savepoint is triggered by sending an HTTP {@code POST} request to
    + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
    + * directory of the savepoint, e.g.,
    + * <pre>
    + * { "target-directory": "/tmp" }
    + * </pre>
    + * If the body is omitted, or the field {@code target-property} is {@code 
null}, the default
    + * savepoint directory as specified by {@link 
CoreOptions#SAVEPOINT_DIRECTORY} will be used.
    + * As written above, the response will contain a request id, e.g.,
    + * <pre>
    + * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
    + * </pre>
    + *
    + * <p>To poll for the status of an ongoing savepoint, an HTTP {@code GET} 
request is issued to
    + * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified 
savepoint is still ongoing,
    + * the response will be
    + * <pre>
    + * {
    + *     "status": {
    + *         "id": "IN_PROGRESS"
    + *     }
    + * }
    + * </pre>
    + * If the specified savepoint has completed, the status id will transition 
to {@code COMPLETED}, and
    + * the response will additionally contain information about the savepoint, 
such as the location:
    + * <pre>
    + * {
    + *     "status": {
    + *         "id": "COMPLETED"
    + *     },
    + *     "savepoint": {
    + *         "request-id": "7d273f5a62eb4730b9dea8e833733c1e",
    + *         "location": "/tmp/savepoint-d9813b-8a68e674325b"
    + *     }
    + * }
    + * </pre>
    + */
    +public class SavepointHandlers {
    +
    +   private final CompletedCheckpointCache completedCheckpointCache = new 
CompletedCheckpointCache();
    +
    +   @Nullable
    +   private String defaultSavepointDir;
    --- End diff --
    
    Could do but additional effort required due to:
    ```
        @VisibleForTesting
        void setDefaultSavepointDir(@Nullable final String defaultSavepointDir) 
{
                this.defaultSavepointDir = defaultSavepointDir;
        }
    ```


---

Reply via email to