[FLINK-4735] [cluster management] Implements some job execution related RPC calls on the JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/041dfd78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/041dfd78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/041dfd78 Branch: refs/heads/flip-6 Commit: 041dfd78a70a45a2b697029c8c1e914050ffee91 Parents: cef3191 Author: Kurt Young <ykt...@gmail.com> Authored: Tue Oct 4 23:00:22 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:43 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 246 +++++++++++++++++-- .../runtime/jobmaster/JobMasterGateway.java | 93 ++++++- .../jobmaster/message/ClassloadingProps.java | 68 +++++ .../message/DisposeSavepointResponse.java | 49 ++++ .../message/TriggerSavepointResponse.java | 74 ++++++ 5 files changed, 507 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 8f3a342..3b8fc97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -39,8 +40,11 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -61,10 +65,20 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; +import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; +import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; @@ -72,7 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.state.CheckpointStateHandles; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; @@ -520,22 +534,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new ExecutionGraphException("The execution attempt " + taskExecutionState.getID() + " was not found."); } - - } - - //----------------------------------------------------------------------------------------------⨠- // Internal methods⨠- // ----------------------------------------------------------------------------------------------â¨â¨ - - private void handleFatalError(final Throwable cause) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); - shutDown(); - jobCompletionActions.onFatalError(cause); - } - }); } @RpcMethod @@ -631,10 +629,220 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { throw new UnsupportedOperationException(); } + @RpcMethod + public void resourceRemoved(final ResourceID resourceId, final String message) { + // TODO: remove resource from slot pool + } + + @RpcMethod + public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) { + if (executionGraph != null) { + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) { + log.info("Received message for non-existing checkpoint {}.", + acknowledge.getCheckpointId()); + } + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e); + } + } + }); + } + else { + log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", + jobGraph.getJobID()); + } + } else { + log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); + } + } + + @RpcMethod + public void declineCheckpoint(final DeclineCheckpoint decline) { + if (executionGraph != null) { + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + getRpcService().execute(new Runnable() { + @Override + public void run() { + try { + if (!checkpointCoordinator.receiveDeclineMessage(decline)) { + log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId()); + } + } catch (Exception e) { + log.error("Error in CheckpointCoordinator while processing {}", decline, e); + } + } + }); + } else { + log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", + jobGraph.getJobID()); + } + } else { + log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID()); + } + } + + @RpcMethod + public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception { + if (executionGraph != null) { + if (log.isDebugEnabled()) { + log.debug("Lookup key-value state for job {} with registration " + + "name {}.", jobGraph.getJobID(), registrationName); + } + + final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); + final KvStateLocation location = registry.getKvStateLocation(registrationName); + if (location != null) { + return location; + } else { + throw new UnknownKvStateLocation(registrationName); + } + } else { + throw new IllegalStateException("Received lookup KvState location request for unavailable job " + + jobGraph.getJobID()); + } + } + + @RpcMethod + public void notifyKvStateRegistered( + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress) + { + if (executionGraph != null) { + if (log.isDebugEnabled()) { + log.debug("Key value state registered for job {} under name {}.", + jobGraph.getJobID(), registrationName); + } + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( + jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress + ); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); + } + } else { + log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID()); + } + } + + @RpcMethod + public void notifyKvStateUnregistered( + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) + { + if (executionGraph != null) { + if (log.isDebugEnabled()) { + log.debug("Key value state unregistered for job {} under name {}.", + jobGraph.getJobID(), registrationName); + } + try { + executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( + jobVertexId, keyGroupRange, registrationName + ); + } catch (Exception e) { + log.error("Failed to notify KvStateRegistry about registration {}.", registrationName); + } + } else { + log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID()); + } + } + + @RpcMethod + public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception { + if (executionGraph != null) { + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + try { + Future<String> savepointFuture = new FlinkFuture<>( + checkpointCoordinator.triggerSavepoint(System.currentTimeMillis())); + + return savepointFuture.handleAsync(new BiFunction<String, Throwable, TriggerSavepointResponse>() { + @Override + public TriggerSavepointResponse apply(String savepointPath, Throwable throwable) { + if (throwable == null) { + return new TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath); + } + else { + return new TriggerSavepointResponse.Failure(jobGraph.getJobID(), + new Exception("Failed to complete savepoint", throwable)); + } + } + }, getMainThreadExecutor()); + + } catch (Exception e) { + FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); + future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), + new Exception("Failed to trigger savepoint", e))); + return future; + } + } else { + FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); + future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), + new IllegalStateException("Checkpointing disabled. You can enable it via the execution " + + "environment of your job."))); + return future; + } + } else { + FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>(); + future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(), + new IllegalArgumentException("Received trigger savepoint request for unavailable job " + + jobGraph.getJobID()))); + return future; + } + } + + @RpcMethod + public DisposeSavepointResponse disposeSavepoint(final String savepointPath) { + try { + log.info("Disposing savepoint at {}.", savepointPath); + + // check whether the savepoint exists + savepointStore.loadSavepoint(savepointPath); + + savepointStore.disposeSavepoint(savepointPath); + return new DisposeSavepointResponse.Success(); + } catch (Exception e) { + log.error("Failed to dispose savepoint at {}.", savepointPath, e); + return new DisposeSavepointResponse.Failure(e); + } + } + + @RpcMethod + public ClassloadingProps requestClassloadingProps() throws Exception { + if (executionGraph != null) { + return new ClassloadingProps(libraryCacheManager.getBlobServerPort(), + executionGraph.getRequiredJarFiles(), + executionGraph.getRequiredClasspaths()); + } else { + throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID()); + } + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- + private void handleFatalError(final Throwable cause) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause); + shutDown(); + jobCompletionActions.onFatalError(cause); + } + }); + } + // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) { final JobID jobID = executionGraph.getJobID(); http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index e3e57d4..4b51258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -30,8 +28,18 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; +import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse; +import org.apache.flink.runtime.jobmaster.message.NextInputSplit; +import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import java.util.UUID; @@ -110,4 +118,81 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param resourceID identifying the TaskManager to disconnect */ void disconnectTaskManager(ResourceID resourceID); + void scheduleOrUpdateConsumers(final ResultPartitionID partitionID); + + /** + * Notifies the JobManager about the removal of a resource. + * + * @param resourceId The ID under which the resource is registered. + * @param message Optional message with details, for logging and debugging. + */ + + void resourceRemoved(final ResourceID resourceId, final String message); + + /** + * Notifies the JobManager that the checkpoint of an individual task is completed. + * + * @param acknowledge The acknowledge message of the checkpoint + */ + void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge); + + /** + * Notifies the JobManager that a checkpoint request could not be heeded. + * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints. + * + * @param decline The decline message of the checkpoint + */ + void declineCheckpoint(final DeclineCheckpoint decline); + + /** + * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name. + * + * @param registrationName Name under which the KvState has been registered. + * @return Future of the requested {@link KvState} location + */ + Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception; + + /** + * @param jobVertexId JobVertexID the KvState instance belongs to. + * @param keyGroupRange Key group range the KvState instance belongs to. + * @param registrationName Name under which the KvState has been registered. + * @param kvStateId ID of the registered KvState instance. + * @param kvStateServerAddress Server address where to find the KvState instance. + */ + void notifyKvStateRegistered( + final JobVertexID jobVertexId, + final KeyGroupRange keyGroupRange, + final String registrationName, + final KvStateID kvStateId, + final KvStateServerAddress kvStateServerAddress); + + /** + * @param jobVertexId JobVertexID the KvState instance belongs to. + * @param keyGroupRange Key group index the KvState instance belongs to. + * @param registrationName Name under which the KvState has been registered. + */ + void notifyKvStateUnregistered( + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName); + + /** + * Notifies the JobManager to trigger a savepoint for this job. + * + * @return Future of the savepoint trigger response. + */ + Future<TriggerSavepointResponse> triggerSavepoint(); + + /** + * Notifies the Jobmanager to dispose specified savepoint. + * + * @param savepointPath The path of the savepoint. + * @return The future of the savepoint disponse response. + */ + Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath); + + /** + * Request the classloading props of this job. + */ + Future<ClassloadingProps> requestClassloadingProps(); } http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java new file mode 100644 index 0000000..2d670b4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.message; + +import org.apache.flink.runtime.blob.BlobKey; + +import java.io.Serializable; +import java.net.URL; +import java.util.List; + +/** + * The response of classloading props request to JobManager. + */ +public class ClassloadingProps implements Serializable { + + private static final long serialVersionUID = -3282341310808511823L; + + private final int blobManagerPort; + + private final List<BlobKey> requiredJarFiles; + + private final List<URL> requiredClasspaths; + + /** + * Constructor of ClassloadingProps. + * + * @param blobManagerPort The port of the blobManager + * @param requiredJarFiles The blob keys of the required jar files + * @param requiredClasspaths The urls of the required classpaths + */ + public ClassloadingProps( + final int blobManagerPort, + final List<BlobKey> requiredJarFiles, + final List<URL> requiredClasspaths) + { + this.blobManagerPort = blobManagerPort; + this.requiredJarFiles = requiredJarFiles; + this.requiredClasspaths = requiredClasspaths; + } + + public int getBlobManagerPort() { + return blobManagerPort; + } + + public List<BlobKey> getRequiredJarFiles() { + return requiredJarFiles; + } + + public List<URL> getRequiredClasspaths() { + return requiredClasspaths; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java new file mode 100644 index 0000000..42bfc71 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.message; + +import java.io.Serializable; + +/** + * The response of the dispose savepoint request to JobManager. + */ +public abstract class DisposeSavepointResponse implements Serializable { + + private static final long serialVersionUID = 6008792963949369567L; + + public static class Success extends DisposeSavepointResponse implements Serializable { + + private static final long serialVersionUID = 1572462960008711415L; + } + + public static class Failure extends DisposeSavepointResponse implements Serializable { + + private static final long serialVersionUID = -7505308325483022458L; + + private final Throwable cause; + + public Failure(final Throwable cause) { + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java new file mode 100644 index 0000000..0b0edc5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.message; + +import org.apache.flink.api.common.JobID; + +import java.io.Serializable; + +/** + * The response of the trigger savepoint request to JobManager. + */ +public abstract class TriggerSavepointResponse implements Serializable { + + private static final long serialVersionUID = 3139327824611807707L; + + private final JobID jobID; + + public JobID getJobID() { + return jobID; + } + + public TriggerSavepointResponse(final JobID jobID) { + this.jobID = jobID; + } + + public static class Success extends TriggerSavepointResponse implements Serializable { + + private static final long serialVersionUID = -1100637460388881776L; + + private final String savepointPath; + + public Success(final JobID jobID, final String savepointPath) { + super(jobID); + this.savepointPath = savepointPath; + } + + public String getSavepointPath() { + return savepointPath; + } + } + + public static class Failure extends TriggerSavepointResponse implements Serializable { + + private static final long serialVersionUID = -1668479003490615139L; + + private final Throwable cause; + + public Failure(final JobID jobID, final Throwable cause) { + super(jobID); + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } + } +} +