[FLINK-4735] [cluster management] Implements some job execution related RPC 
calls on the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/041dfd78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/041dfd78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/041dfd78

Branch: refs/heads/flip-6
Commit: 041dfd78a70a45a2b697029c8c1e914050ffee91
Parents: cef3191
Author: Kurt Young <ykt...@gmail.com>
Authored: Tue Oct 4 23:00:22 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:43 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 246 +++++++++++++++++--
 .../runtime/jobmaster/JobMasterGateway.java     |  93 ++++++-
 .../jobmaster/message/ClassloadingProps.java    |  68 +++++
 .../message/DisposeSavepointResponse.java       |  49 ++++
 .../message/TriggerSavepointResponse.java       |  74 ++++++
 5 files changed, 507 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8f3a342..3b8fc97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -39,8 +40,11 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -61,10 +65,20 @@ import 
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -72,7 +86,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
@@ -520,22 +534,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        throw new ExecutionGraphException("The execution 
attempt " +
                                taskExecutionState.getID() + " was not found.");
                }
-
-       }
-
-       
//----------------------------------------------------------------------------------------------

-       // Internal methods

-       // 
----------------------------------------------------------------------------------------------


-
-       private void handleFatalError(final Throwable cause) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
-                               shutDown();
-                               jobCompletionActions.onFatalError(cause);
-                       }
-               });
        }
 
        @RpcMethod
@@ -631,10 +629,220 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                throw new UnsupportedOperationException();
        }
 
+       @RpcMethod
+       public void resourceRemoved(final ResourceID resourceId, final String 
message) {
+               // TODO: remove resource from slot pool
+       }
+
+       @RpcMethod
+       public void acknowledgeCheckpoint(final AcknowledgeCheckpoint 
acknowledge) {
+               if (executionGraph != null) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               getRpcService().execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       if 
(!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
+                                                               
log.info("Received message for non-existing checkpoint {}.",
+                                                                       
acknowledge.getCheckpointId());
+                                                       }
+                                               } catch (Exception e) {
+                                                       log.error("Error in 
CheckpointCoordinator while processing {}", acknowledge, e);
+                                               }
+                                       }
+                               });
+                       }
+                       else {
+                               log.error("Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator",
+                                       jobGraph.getJobID());
+                       }
+               } else {
+                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
+               }
+       }
+
+       @RpcMethod
+       public void declineCheckpoint(final DeclineCheckpoint decline) {
+               if (executionGraph != null) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               getRpcService().execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       if 
(!checkpointCoordinator.receiveDeclineMessage(decline)) {
+                                                               
log.info("Received message for non-existing checkpoint {}.", 
decline.getCheckpointId());
+                                                       }
+                                               } catch (Exception e) {
+                                                       log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
+                                               }
+                                       }
+                               });
+                       } else {
+                               log.error("Received DeclineCheckpoint message 
for job {} with no CheckpointCoordinator",
+                                       jobGraph.getJobID());
+                       }
+               } else {
+                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
+               }
+       }
+
+       @RpcMethod
+       public KvStateLocation lookupKvStateLocation(final String 
registrationName) throws Exception {
+               if (executionGraph != null) {
+                       if (log.isDebugEnabled()) {
+                               log.debug("Lookup key-value state for job {} 
with registration " +
+                                       "name {}.", jobGraph.getJobID(), 
registrationName);
+                       }
+
+                       final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
+                       final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
+                       if (location != null) {
+                               return location;
+                       } else {
+                               throw new 
UnknownKvStateLocation(registrationName);
+                       }
+               } else {
+                       throw new IllegalStateException("Received lookup 
KvState location request for unavailable job " +
+                               jobGraph.getJobID());
+               }
+       }
+
+       @RpcMethod
+       public void notifyKvStateRegistered(
+               final JobVertexID jobVertexId,
+               final KeyGroupRange keyGroupRange,
+               final String registrationName,
+               final KvStateID kvStateId,
+               final KvStateServerAddress kvStateServerAddress)
+       {
+               if (executionGraph != null) {
+                       if (log.isDebugEnabled()) {
+                               log.debug("Key value state registered for job 
{} under name {}.",
+                                       jobGraph.getJobID(), registrationName);
+                       }
+                       try {
+                               
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress
+                               );
+                       } catch (Exception e) {
+                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
+                       }
+               } else {
+                       log.error("Received notify KvState registered request 
for unavailable job " + jobGraph.getJobID());
+               }
+       }
+
+       @RpcMethod
+       public void notifyKvStateUnregistered(
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName)
+       {
+               if (executionGraph != null) {
+                       if (log.isDebugEnabled()) {
+                               log.debug("Key value state unregistered for job 
{} under name {}.",
+                                       jobGraph.getJobID(), registrationName);
+                       }
+                       try {
+                               
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+                                       jobVertexId, keyGroupRange, 
registrationName
+                               );
+                       } catch (Exception e) {
+                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
+                       }
+               } else {
+                       log.error("Received notify KvState unregistered request 
for unavailable job " + jobGraph.getJobID());
+               }
+       }
+
+       @RpcMethod
+       public Future<TriggerSavepointResponse> triggerSavepoint() throws 
Exception {
+               if (executionGraph != null) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               try {
+                                       Future<String> savepointFuture = new 
FlinkFuture<>(
+                                               
checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
+
+                                       return savepointFuture.handleAsync(new 
BiFunction<String, Throwable, TriggerSavepointResponse>() {
+                                               @Override
+                                               public TriggerSavepointResponse 
apply(String savepointPath, Throwable throwable) {
+                                                       if (throwable == null) {
+                                                               return new 
TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath);
+                                                       }
+                                                       else {
+                                                               return new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+                                                                       new 
Exception("Failed to complete savepoint", throwable));
+                                                       }
+                                               }
+                                       }, getMainThreadExecutor());
+
+                               } catch (Exception e) {
+                                       
FlinkCompletableFuture<TriggerSavepointResponse> future = new 
FlinkCompletableFuture<>();
+                                       future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+                                               new Exception("Failed to 
trigger savepoint", e)));
+                                       return future;
+                               }
+                       } else {
+                               
FlinkCompletableFuture<TriggerSavepointResponse> future = new 
FlinkCompletableFuture<>();
+                               future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+                                       new 
IllegalStateException("Checkpointing disabled. You can enable it via the 
execution " +
+                                               "environment of your job.")));
+                               return future;
+                       }
+               } else {
+                       FlinkCompletableFuture<TriggerSavepointResponse> future 
= new FlinkCompletableFuture<>();
+                       future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
+                               new IllegalArgumentException("Received trigger 
savepoint request for unavailable job " +
+                                       jobGraph.getJobID())));
+                       return future;
+               }
+       }
+
+       @RpcMethod
+       public DisposeSavepointResponse disposeSavepoint(final String 
savepointPath) {
+               try {
+                       log.info("Disposing savepoint at {}.", savepointPath);
+
+                       // check whether the savepoint exists
+                       savepointStore.loadSavepoint(savepointPath);
+
+                       savepointStore.disposeSavepoint(savepointPath);
+                       return new DisposeSavepointResponse.Success();
+               } catch (Exception e) {
+                       log.error("Failed to dispose savepoint at {}.", 
savepointPath, e);
+                       return new DisposeSavepointResponse.Failure(e);
+               }
+       }
+
+       @RpcMethod
+       public ClassloadingProps requestClassloadingProps() throws Exception {
+               if (executionGraph != null) {
+                       return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+                               executionGraph.getRequiredJarFiles(),
+                               executionGraph.getRequiredClasspaths());
+               } else {
+                       throw new Exception("Received classloading props 
request for unavailable job " + jobGraph.getJobID());
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Internal methods
        
//----------------------------------------------------------------------------------------------
 
+       private void handleFatalError(final Throwable cause) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
+                               shutDown();
+                               jobCompletionActions.onFatalError(cause);
+                       }
+               });
+       }
+
        // TODO - wrap this as StatusListenerMessenger's callback with rpc main 
thread
        private void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
                final JobID jobID = executionGraph.getJobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index e3e57d4..4b51258 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,8 +28,18 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import java.util.UUID;
@@ -110,4 +118,81 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param resourceID identifying the TaskManager to disconnect
         */
        void disconnectTaskManager(ResourceID resourceID);
+       void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
+
+       /**
+        * Notifies the JobManager about the removal of a resource.
+        *
+        * @param resourceId The ID under which the resource is registered.
+        * @param message    Optional message with details, for logging and 
debugging.
+        */
+
+       void resourceRemoved(final ResourceID resourceId, final String message);
+
+       /**
+        * Notifies the JobManager that the checkpoint of an individual task is 
completed.
+        *
+        * @param acknowledge The acknowledge message of the checkpoint
+        */
+       void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
+
+       /**
+        * Notifies the JobManager that a checkpoint request could not be 
heeded.
+        * This can happen if a Task is already in RUNNING state but is 
internally not yet ready to perform checkpoints.
+        *
+        * @param decline The decline message of the checkpoint
+        */
+       void declineCheckpoint(final DeclineCheckpoint decline);
+
+       /**
+        * Requests a {@link KvStateLocation} for the specified {@link KvState} 
registration name.
+        *
+        * @param registrationName Name under which the KvState has been 
registered.
+        * @return Future of the requested {@link KvState} location
+        */
+       Future<KvStateLocation> lookupKvStateLocation(final String 
registrationName) throws Exception;
+
+       /**
+        * @param jobVertexId          JobVertexID the KvState instance belongs 
to.
+        * @param keyGroupRange        Key group range the KvState instance 
belongs to.
+        * @param registrationName     Name under which the KvState has been 
registered.
+        * @param kvStateId            ID of the registered KvState instance.
+        * @param kvStateServerAddress Server address where to find the KvState 
instance.
+        */
+       void notifyKvStateRegistered(
+               final JobVertexID jobVertexId,
+               final KeyGroupRange keyGroupRange,
+               final String registrationName,
+               final KvStateID kvStateId,
+               final KvStateServerAddress kvStateServerAddress);
+
+       /**
+        * @param jobVertexId      JobVertexID the KvState instance belongs to.
+        * @param keyGroupRange    Key group index the KvState instance belongs 
to.
+        * @param registrationName Name under which the KvState has been 
registered.
+        */
+       void notifyKvStateUnregistered(
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName);
+
+       /**
+        * Notifies the JobManager to trigger a savepoint for this job.
+        *
+        * @return Future of the savepoint trigger response.
+        */
+       Future<TriggerSavepointResponse> triggerSavepoint();
+
+       /**
+        * Notifies the Jobmanager to dispose specified savepoint.
+        *
+        * @param savepointPath The path of the savepoint.
+        * @return The future of the savepoint disponse response.
+        */
+       Future<DisposeSavepointResponse> disposeSavepoint(final String 
savepointPath);
+
+       /**
+        * Request the classloading props of this job.
+        */
+       Future<ClassloadingProps> requestClassloadingProps();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
new file mode 100644
index 0000000..2d670b4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.runtime.blob.BlobKey;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * The response of classloading props request to JobManager.
+ */
+public class ClassloadingProps implements Serializable {
+
+       private static final long serialVersionUID = -3282341310808511823L;
+
+       private final int blobManagerPort;
+
+       private final List<BlobKey> requiredJarFiles;
+
+       private final List<URL> requiredClasspaths;
+
+       /**
+        * Constructor of ClassloadingProps.
+        *
+        * @param blobManagerPort    The port of the blobManager
+        * @param requiredJarFiles   The blob keys of the required jar files
+        * @param requiredClasspaths The urls of the required classpaths
+        */
+       public ClassloadingProps(
+               final int blobManagerPort,
+               final List<BlobKey> requiredJarFiles,
+               final List<URL> requiredClasspaths)
+       {
+               this.blobManagerPort = blobManagerPort;
+               this.requiredJarFiles = requiredJarFiles;
+               this.requiredClasspaths = requiredClasspaths;
+       }
+
+       public int getBlobManagerPort() {
+               return blobManagerPort;
+       }
+
+       public List<BlobKey> getRequiredJarFiles() {
+               return requiredJarFiles;
+       }
+
+       public List<URL> getRequiredClasspaths() {
+               return requiredClasspaths;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
new file mode 100644
index 0000000..42bfc71
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * The response of the dispose savepoint request to JobManager.
+ */
+public abstract class DisposeSavepointResponse implements Serializable {
+
+       private static final long serialVersionUID = 6008792963949369567L;
+
+       public static class Success extends DisposeSavepointResponse implements 
Serializable {
+
+               private static final long serialVersionUID = 
1572462960008711415L;
+       }
+
+       public static class Failure extends DisposeSavepointResponse implements 
Serializable {
+
+               private static final long serialVersionUID = 
-7505308325483022458L;
+
+               private final Throwable cause;
+
+               public Failure(final Throwable cause) {
+                       this.cause = cause;
+               }
+
+               public Throwable getCause() {
+                       return cause;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/041dfd78/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
new file mode 100644
index 0000000..0b0edc5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/**
+ * The response of the trigger savepoint request to JobManager.
+ */
+public abstract class TriggerSavepointResponse implements Serializable {
+
+       private static final long serialVersionUID = 3139327824611807707L;
+
+       private final JobID jobID;
+
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       public TriggerSavepointResponse(final JobID jobID) {
+               this.jobID = jobID;
+       }
+
+       public static class Success extends TriggerSavepointResponse implements 
Serializable {
+
+               private static final long serialVersionUID = 
-1100637460388881776L;
+
+               private final String savepointPath;
+
+               public Success(final JobID jobID, final String savepointPath) {
+                       super(jobID);
+                       this.savepointPath = savepointPath;
+               }
+
+               public String getSavepointPath() {
+                       return savepointPath;
+               }
+       }
+
+       public static class Failure extends TriggerSavepointResponse implements 
Serializable {
+
+               private static final long serialVersionUID = 
-1668479003490615139L;
+
+               private final Throwable cause;
+
+               public Failure(final JobID jobID, final Throwable cause) {
+                       super(jobID);
+                       this.cause = cause;
+               }
+
+               public Throwable getCause() {
+                       return cause;
+               }
+       }
+}
+

Reply via email to