[FLINK-7072] [REST] Define protocol for job submit/cancel/stop

[FLINK-7072] [REST] Extend Dispatcher

[FLINK-7072] [REST] Add handlers for job submit/cancel/stop

[FLINK-7072] [REST] CLI integration

use ExecutorThradFactory + rebase(blobKey fix)

add "Flink" prefix to RestCC threads

shutdown client for cancel/shutdown

Rework CliFrontEnd Stop/Cancel tests

These tests verified that the CLI was sending the correct messages and
parameters to the JM actor. This is now handled by the ClusterClient, so
the tests were adjusted to verify that the correct methods on the
ClusterClient are being called.

Additional tests were added to the ClusterClientTest class to verify
that the correct messages and parameters are being sent.

This closes #4742.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad380463
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad380463
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad380463

Branch: refs/heads/master
Commit: ad380463d3d44cdd98302bf072bc5deba8696b5b
Parents: 6299533
Author: zentol <[email protected]>
Authored: Wed Sep 20 14:55:46 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon Oct 9 19:11:44 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  79 +++----
 .../flink/client/cli/Flip6DefaultCLI.java       |  98 ++++++++
 .../Flip6StandaloneClusterDescriptor.java       |  63 ++++++
 .../flink/client/program/ClusterClient.java     |  77 ++++---
 .../client/program/rest/RestClusterClient.java  | 221 +++++++++++++++++++
 .../rest/RestClusterClientConfiguration.java    |  78 +++++++
 .../flink/client/CliFrontendListCancelTest.java | 123 ++++++-----
 .../apache/flink/client/CliFrontendRunTest.java |   9 +
 .../flink/client/CliFrontendStopTest.java       | 118 ++++------
 .../flink/client/program/ClusterClientTest.java | 143 ++++++++++++
 .../program/rest/RestClusterClientTest.java     | 192 ++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    |   5 +
 .../runtime/dispatcher/DispatcherGateway.java   |   8 +
 .../dispatcher/DispatcherRestEndpoint.java      |   8 +
 .../flink/runtime/rest/RestServerEndpoint.java  |   4 +-
 .../rest/handler/job/BlobServerPortHandler.java |  61 +++++
 .../rest/handler/job/JobSubmitHandler.java      |  66 ++++++
 .../rest/messages/BlobServerPortHeaders.java    |  69 ++++++
 .../messages/BlobServerPortResponseBody.java    |  57 +++++
 .../JobTerminationMessageParameters.java        |   4 +-
 .../rest/messages/job/JobSubmitHeaders.java     |  71 ++++++
 .../rest/messages/job/JobSubmitRequestBody.java |  88 ++++++++
 .../messages/job/JobSubmitResponseBody.java     |  61 +++++
 .../handler/job/BlobServerPortHandlerTest.java  |  97 ++++++++
 .../rest/handler/job/JobSubmitHandlerTest.java  |  87 ++++++++
 .../messages/BlobServerPortResponseTest.java    |  37 ++++
 .../rest/messages/JobSubmitRequestBodyTest.java |  41 ++++
 .../messages/JobSubmitResponseBodyTest.java     |  38 ++++
 28 files changed, 1790 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9d1f52e..9be8295 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
@@ -59,13 +60,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -146,6 +141,8 @@ public class CliFrontend {
                } catch (Exception e) {
                        LOG.warn("Could not load CLI class {}.", flinkYarnCLI, 
e);
                }
+
+               customCommandLines.add(new Flip6DefaultCLI());
                customCommandLines.add(new DefaultCLI());
        }
 
@@ -555,17 +552,18 @@ public class CliFrontend {
                }
 
                try {
-                       ActorGateway jobManager = getJobManagerGateway(options);
-                       Future<Object> response = jobManager.ask(new 
StopJob(jobId), clientTimeout);
-
-                       final Object rc = Await.result(response, clientTimeout);
+                       CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+                       ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+                       try {
+                               logAndSysout("Stopping job " + jobId + '.');
+                               client.stop(jobId);
+                               logAndSysout("Stopped job " + jobId + '.');
 
-                       if (rc instanceof StoppingFailure) {
-                               throw new Exception("Stopping the job with ID " 
+ jobId + " failed.",
-                                               ((StoppingFailure) rc).cause());
+                               return 0;
+                       } finally {
+                               client.shutdown();
                        }
 
-                       return 0;
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -636,40 +634,27 @@ public class CliFrontend {
                }
 
                try {
-                       ActorGateway jobManager = getJobManagerGateway(options);
-
-                       Object cancelMsg;
-                       if (withSavepoint) {
-                               if (targetDirectory == null) {
-                                       logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
-                               } else {
-                                       logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + ".");
-                               }
-                               cancelMsg = new CancelJobWithSavepoint(jobId, 
targetDirectory);
-                       } else {
-                               logAndSysout("Cancelling job " + jobId + ".");
-                               cancelMsg = new CancelJob(jobId);
-                       }
-
-                       Future<Object> response = jobManager.ask(cancelMsg, 
clientTimeout);
-                       final Object rc = Await.result(response, clientTimeout);
-
-                       if (rc instanceof CancellationSuccess) {
+                       CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+                       ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
+                       try {
                                if (withSavepoint) {
-                                       CancellationSuccess success = 
(CancellationSuccess) rc;
-                                       String savepointPath = 
success.savepointPath();
-                                       logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + ".");
+                                       if (targetDirectory == null) {
+                                               logAndSysout("Cancelling job " 
+ jobId + " with savepoint to default savepoint directory.");
+                                       } else {
+                                               logAndSysout("Cancelling job " 
+ jobId + " with savepoint to " + targetDirectory + '.');
+                                       }
+                                       String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
+                                       logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + '.');
                                } else {
-                                       logAndSysout("Cancelled job " + jobId + 
".");
+                                       logAndSysout("Cancelling job " + jobId 
+ '.');
+                                       client.cancel(jobId);
+                                       logAndSysout("Cancelled job " + jobId + 
'.');
                                }
-                       } else if (rc instanceof CancellationFailure) {
-                               throw new Exception("Canceling the job with ID 
" + jobId + " failed.",
-                                               ((CancellationFailure) 
rc).cause());
-                       } else {
-                               throw new IllegalStateException("Unexpected 
response: " + rc);
-                       }
 
-                       return 0;
+                               return 0;
+                       } finally {
+                               client.shutdown();
+                       }
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -978,7 +963,11 @@ public class CliFrontend {
                // Avoid resolving the JobManager Gateway here to prevent 
blocking until we invoke the user's program.
                final InetSocketAddress jobManagerAddress = 
client.getJobManagerAddress();
                logAndSysout("Using address " + 
jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to 
connect to JobManager.");
-               logAndSysout("JobManager web interface address " + 
client.getWebInterfaceURL());
+               try {
+                       logAndSysout("JobManager web interface address " + 
client.getWebInterfaceURL());
+               } catch (UnsupportedOperationException uoe) {
+                       logAndSysout("JobManager web interface not active.");
+               }
                return client;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
new file mode 100644
index 0000000..5fb9dfc
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+
+import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * The default CLI which is used for interaction with standalone clusters.
+ */
+public class Flip6DefaultCLI implements CustomCommandLine<RestClusterClient> {
+
+       public static final Option FLIP_6 = new Option("flip6", "Switches the 
client to Flip-6 mode.");
+
+       static {
+               FLIP_6.setRequired(false);
+       }
+
+       @Override
+       public boolean isActive(CommandLine commandLine, Configuration 
configuration) {
+               return commandLine.hasOption(FLIP_6.getOpt());
+       }
+
+       @Override
+       public String getId() {
+               return "flip6";
+       }
+
+       @Override
+       public void addRunOptions(Options baseOptions) {
+       }
+
+       @Override
+       public void addGeneralOptions(Options baseOptions) {
+               baseOptions.addOption(FLIP_6);
+       }
+
+       @Override
+       public RestClusterClient retrieveCluster(CommandLine commandLine, 
Configuration config, String configurationDirectory) {
+               if 
(commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
+                       String addressWithPort = 
commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
+                       InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(addressWithPort);
+                       setJobManagerAddressInConfig(config, jobManagerAddress);
+               }
+
+               if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
+                       String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
+                       config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
+               }
+
+               Flip6StandaloneClusterDescriptor descriptor = new 
Flip6StandaloneClusterDescriptor(config);
+               return descriptor.retrieve(null);
+       }
+
+       @Override
+       public RestClusterClient createCluster(
+                       String applicationName,
+                       CommandLine commandLine,
+                       Configuration config,
+                       String configurationDirectory,
+                       List<URL> userJarFiles) throws 
UnsupportedOperationException {
+
+               Flip6StandaloneClusterDescriptor descriptor = new 
Flip6StandaloneClusterDescriptor(config);
+               ClusterSpecification clusterSpecification = 
ClusterSpecification.fromConfiguration(config);
+
+               return descriptor.deploySessionCluster(clusterSpecification);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
new file mode 100644
index 0000000..9d88f59
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor<RestClusterClient> {
+
+       private final Configuration config;
+
+       public Flip6StandaloneClusterDescriptor(Configuration config) {
+               this.config = Preconditions.checkNotNull(config);
+       }
+
+       @Override
+       public String getClusterDescription() {
+               String host = config.getString(JobManagerOptions.ADDRESS, "");
+               int port = config.getInteger(JobManagerOptions.PORT, -1);
+               return "FLIP-6 Standalone cluster at " + host + ":" + port;
+       }
+
+       @Override
+       public RestClusterClient retrieve(String applicationID) {
+               try {
+                       return new RestClusterClient(config);
+               } catch (Exception e) {
+                       throw new RuntimeException("Couldn't retrieve FLIP-6 
standalone cluster", e);
+               }
+       }
+
+       @Override
+       public RestClusterClient deploySessionCluster(ClusterSpecification 
clusterSpecification) throws UnsupportedOperationException {
+               throw new UnsupportedOperationException("Can't deploy a FLIP-6 
standalone cluster.");
+       }
+
+       @Override
+       public RestClusterClient deployJobCluster(ClusterSpecification 
clusterSpecification, JobGraph jobGraph) {
+               throw new UnsupportedOperationException("Can't deploy a 
standalone FLIP-6 per-job cluster.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c8a236e..78455c1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -63,6 +63,8 @@ import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -82,7 +84,7 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class ClusterClient {
 
-       private final Logger log = LoggerFactory.getLogger(getClass());
+       protected final Logger log = LoggerFactory.getLogger(getClass());
 
        /** The optimizer used in the optimization of batch programs. */
        final Optimizer compiler;
@@ -575,25 +577,46 @@ public abstract class ClusterClient {
         * @throws Exception In case an error occurred.
         */
        public void cancel(JobID jobId) throws Exception {
-               final ActorGateway jobManagerGateway = getJobManagerGateway();
+               final ActorGateway jobManager = getJobManagerGateway();
 
-               final Future<Object> response;
-               try {
-                       response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
-               } catch (final Exception e) {
-                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
+               Object cancelMsg = new JobManagerMessages.CancelJob(jobId);
+
+               Future<Object> response = jobManager.ask(cancelMsg, timeout);
+               final Object rc = Await.result(response, timeout);
+
+               if (rc instanceof JobManagerMessages.CancellationSuccess) {
+                       // no further action required
+               } else if (rc instanceof 
JobManagerMessages.CancellationFailure) {
+                       throw new Exception("Canceling the job with ID " + 
jobId + " failed.",
+                               ((JobManagerMessages.CancellationFailure) 
rc).cause());
+               } else {
+                       throw new IllegalStateException("Unexpected response: " 
+ rc);
                }
+       }
 
-               final Object result = Await.result(response, timeout);
+       /**
+        * Cancels a job identified by the job id and triggers a savepoint.
+        * @param jobId the job id
+        * @param savepointDirectory directory the savepoint should be written 
to
+        * @return path where the savepoint is located
+        * @throws Exception In case an error cocurred.
+        */
+       public String cancelWithSavepoint(JobID jobId, @Nullable String 
savepointDirectory) throws Exception {
+               final ActorGateway jobManager = getJobManagerGateway();
+
+               Object cancelMsg = new 
JobManagerMessages.CancelJobWithSavepoint(jobId, savepointDirectory);
 
-               if (result instanceof JobManagerMessages.CancellationSuccess) {
-                       logAndSysout("Job cancellation with ID " + jobId + " 
succeeded.");
-               } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
-                       final Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
-                       logAndSysout("Job cancellation with ID " + jobId + " 
failed because of " + t.getMessage());
-                       throw new Exception("Failed to cancel the job with id " 
+ jobId, t);
+               Future<Object> response = jobManager.ask(cancelMsg, timeout);
+               final Object rc = Await.result(response, timeout);
+
+               if (rc instanceof JobManagerMessages.CancellationSuccess) {
+                       JobManagerMessages.CancellationSuccess success = 
(JobManagerMessages.CancellationSuccess) rc;
+                       return success.savepointPath();
+               } else if (rc instanceof 
JobManagerMessages.CancellationFailure) {
+                       throw new Exception("Cancel & savepoint for the job 
with ID " + jobId + " failed.",
+                               ((JobManagerMessages.CancellationFailure) 
rc).cause());
                } else {
-                       throw new Exception("Unknown message received while 
cancelling: " + result.getClass().getName());
+                       throw new IllegalStateException("Unexpected response: " 
+ rc);
                }
        }
 
@@ -610,25 +633,19 @@ public abstract class ClusterClient {
         *             failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
         */
        public void stop(final JobID jobId) throws Exception {
-               final ActorGateway jobManagerGateway = getJobManagerGateway();
+               final ActorGateway jobManager = getJobManagerGateway();
 
-               final Future<Object> response;
-               try {
-                       response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
-               } catch (final Exception e) {
-                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
-               }
+               Future<Object> response = jobManager.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
 
-               final Object result = Await.result(response, timeout);
+               final Object rc = Await.result(response, timeout);
 
-               if (result instanceof JobManagerMessages.StoppingSuccess) {
-                       log.info("Job stopping with ID " + jobId + " 
succeeded.");
-               } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
-                       final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
-                       log.info("Job stopping with ID " + jobId + " failed.", 
t);
-                       throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
+               if (rc instanceof JobManagerMessages.StoppingSuccess) {
+                       // no further action required
+               } else if (rc instanceof JobManagerMessages.StoppingFailure) {
+                       throw new Exception("Stopping the job with ID " + jobId 
+ " failed.",
+                               ((JobManagerMessages.StoppingFailure) 
rc).cause());
                } else {
-                       throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
+                       throw new IllegalStateException("Unexpected response: " 
+ rc);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
new file mode 100644
index 0000000..a37ee63
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+       private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+       private final RestClient restClient;
+       private final ExecutorService executorService = 
Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("Flink-RestClusterClient-IO"));
+
+       public RestClusterClient(Configuration config) throws Exception {
+               this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+       }
+
+       public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+               super(config);
+               this.restClusterClientConfiguration = configuration;
+               this.restClient = new 
RestClient(configuration.getRestClientConfiguration(), executorService);
+       }
+
+       @Override
+       public void shutdown() {
+               try {
+                       // we only call this for legacy reasons to shutdown 
components that are started in the ClusterClient constructor
+                       super.shutdown();
+               } catch (Exception e) {
+                       log.error("An error occurred during the client 
shutdown.", e);
+               }
+               this.restClient.shutdown(Time.seconds(5));
+               
org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, 
TimeUnit.SECONDS, this.executorService);
+       }
+
+       @Override
+       protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               log.info("Submitting job.");
+               try {
+                       // temporary hack for FLIP-6 since slot-sharing isn't 
implemented yet
+                       jobGraph.setAllowQueuedScheduling(true);
+                       submitJob(jobGraph);
+               } catch (JobSubmissionException e) {
+                       throw new ProgramInvocationException(e);
+               }
+               // don't return just a JobSubmissionResult here, the signature 
is lying
+               // The CliFrontend expects this to be a JobExecutionResult
+
+               // TOOD: do not exit this method until job is finished
+               return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+       }
+
+       private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+               log.info("Requesting blob server port.");
+               int blobServerPort;
+               try {
+                       CompletableFuture<BlobServerPortResponseBody> 
portFuture = restClient.sendRequest(
+                               
restClusterClientConfiguration.getRestServerAddress(),
+                               
restClusterClientConfiguration.getRestServerPort(),
+                               BlobServerPortHeaders.getInstance());
+                       blobServerPort = portFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS).port;
+               } catch (Exception e) {
+                       throw new JobSubmissionException(jobGraph.getJobID(), 
"Failed to retrieve blob server port.", e);
+               }
+
+               log.info("Uploading jar files.");
+               try {
+                       InetSocketAddress address = new 
InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), 
blobServerPort);
+                       List<PermanentBlobKey> keys = 
BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), 
jobGraph.getUserJars());
+                       for (PermanentBlobKey key : keys) {
+                               jobGraph.addBlob(key);
+                       }
+               } catch (Exception e) {
+                       throw new JobSubmissionException(jobGraph.getJobID(), 
"Failed to upload user jars to blob server.", e);
+               }
+
+               log.info("Submitting job graph.");
+               try {
+                       CompletableFuture<JobSubmitResponseBody> responseFuture 
= restClient.sendRequest(
+                               
restClusterClientConfiguration.getRestServerAddress(),
+                               
restClusterClientConfiguration.getRestServerPort(),
+                               JobSubmitHeaders.getInstance(),
+                               new JobSubmitRequestBody(jobGraph));
+                       responseFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+               } catch (Exception e) {
+                       throw new JobSubmissionException(jobGraph.getJobID(), 
"Failed to submit JobGraph.", e);
+               }
+       }
+
+       @Override
+       public void stop(JobID jobID) throws Exception {
+               JobTerminationMessageParameters params = new 
JobTerminationMessageParameters();
+               params.jobPathParameter.resolve(jobID);
+               
params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
+               CompletableFuture<EmptyResponseBody> responseFuture = 
restClient.sendRequest(
+                       restClusterClientConfiguration.getRestServerAddress(),
+                       restClusterClientConfiguration.getRestServerPort(),
+                       JobTerminationHeaders.getInstance(),
+                       params
+               );
+               responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public void cancel(JobID jobID) throws Exception {
+               JobTerminationMessageParameters params = new 
JobTerminationMessageParameters();
+               params.jobPathParameter.resolve(jobID);
+               
params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
+               CompletableFuture<EmptyResponseBody> responseFuture = 
restClient.sendRequest(
+                       restClusterClientConfiguration.getRestServerAddress(),
+                       restClusterClientConfiguration.getRestServerPort(),
+                       JobTerminationHeaders.getInstance(),
+                       params
+               );
+               responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public String cancelWithSavepoint(JobID jobId, @Nullable String 
savepointDirectory) throws Exception {
+               throw new UnsupportedOperationException();
+       }
+
+       // ======================================
+       // Legacy stuff we actually implement
+       // ======================================
+
+       @Override
+       public String getClusterIdentifier() {
+               return "Flip-6 Standalone cluster with dispatcher at " + 
restClusterClientConfiguration.getRestServerAddress() + '.';
+       }
+
+       @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return false;
+       }
+
+       // ======================================
+       // Legacy stuff we ignore
+       // ======================================
+
+       @Override
+       public void waitForClusterToBeReady() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       protected List<String> getNewMessages() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       protected void finalizeCluster() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getMaxSlots() {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
new file mode 100644
index 0000000..788eba9
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+       private final String blobServerAddress;
+
+       private final RestClientConfiguration restClientConfiguration;
+
+       private final String restServerAddress;
+
+       private final int restServerPort;
+
+       private RestClusterClientConfiguration(
+                       String blobServerAddress,
+                       RestClientConfiguration endpointConfiguration,
+                       String restServerAddress,
+                       int restServerPort) {
+               this.blobServerAddress = 
Preconditions.checkNotNull(blobServerAddress);
+               this.restClientConfiguration = 
Preconditions.checkNotNull(endpointConfiguration);
+               this.restServerAddress = 
Preconditions.checkNotNull(restServerAddress);
+               this.restServerPort = restServerPort;
+       }
+
+       public String getBlobServerAddress() {
+               return blobServerAddress;
+       }
+
+       public String getRestServerAddress() {
+               return restServerAddress;
+       }
+
+       public int getRestServerPort() {
+               return restServerPort;
+       }
+
+       public RestClientConfiguration getRestClientConfiguration() {
+               return restClientConfiguration;
+       }
+
+       public static RestClusterClientConfiguration 
fromConfiguration(Configuration config) throws ConfigurationException {
+               String blobServerAddress = 
config.getString(JobManagerOptions.ADDRESS);
+
+               String serverAddress = 
config.getString(RestOptions.REST_ADDRESS);
+               int serverPort = config.getInteger(RestOptions.REST_PORT);
+
+               RestClientConfiguration restClientConfiguration = 
RestClientConfiguration.fromConfiguration(config);
+
+               return new RestClusterClientConfiguration(blobServerAddress, 
restClientConfiguration, serverAddress, serverPort);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 725d95a..e52dde1 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -19,7 +19,13 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -30,9 +36,11 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.testkit.JavaTestKit;
+import org.apache.commons.cli.CommandLine;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.UUID;
 
@@ -41,6 +49,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the CANCEL and LIST commands.
@@ -88,47 +104,35 @@ public class CliFrontendListCancelTest {
                        // test cancel properly
                        {
                                JobID jid = new JobID();
-                               String jidString = jid.toString();
 
-                               final UUID leaderSessionID = UUID.randomUUID();
-
-                               final ActorRef jm = 
actorSystem.actorOf(Props.create(
-                                                               
CliJobManager.class,
-                                                               jid,
-                                                               leaderSessionID
-                                               )
-                               );
-
-                               final ActorGateway gateway = new 
AkkaActorGateway(jm, leaderSessionID);
-
-                               String[] parameters = { jidString };
-                               InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                               String[] parameters = { jid.toString() };
+                               CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(false);
 
                                int retCode = testFrontend.cancel(parameters);
                                assertTrue(retCode == 0);
+
+                               Mockito.verify(testFrontend.client, 
times(1)).cancel(any(JobID.class));
                        }
 
                        // test cancel properly
                        {
-                               JobID jid1 = new JobID();
-                               JobID jid2 = new JobID();
-
-                               final UUID leaderSessionID = UUID.randomUUID();
+                               JobID jid = new JobID();
 
-                               final ActorRef jm = actorSystem.actorOf(
-                                               Props.create(
-                                                               
CliJobManager.class,
-                                                               jid1,
-                                                               leaderSessionID
-                                               )
-                               );
+                               String[] parameters = { jid.toString() };
+                               CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(true);
 
-                               final ActorGateway gateway = new 
AkkaActorGateway(jm, leaderSessionID);
+                               int retCode = testFrontend.cancel(parameters);
+                               assertTrue(retCode != 0);
 
-                               String[] parameters = { jid2.toString() };
-                               InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                               Mockito.verify(testFrontend.client, 
times(1)).cancel(any(JobID.class));
+                       }
 
-                               assertTrue(testFrontend.cancel(parameters) != 
0);
+                       // test flip6 switch
+                       {
+                               String[] parameters =
+                                       {"-flip6", String.valueOf(new JobID())};
+                               CancelOptions options = 
CliFrontendParser.parseCancelCommand(parameters);
+                               
assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
                        }
                }
                catch (Exception e) {
@@ -145,56 +149,38 @@ public class CliFrontendListCancelTest {
                {
                        // Cancel with savepoint (no target directory)
                        JobID jid = new JobID();
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID);
-                       ActorRef jm = actorSystem.actorOf(props);
-                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
 
                        String[] parameters = { "-s", jid.toString() };
-                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(false);
                        assertEquals(0, testFrontend.cancel(parameters));
+
+                       Mockito.verify(testFrontend.client, times(1))
+                               .cancelWithSavepoint(any(JobID.class), 
isNull(String.class));
                }
 
                {
                        // Cancel with savepoint (with target directory)
                        JobID jid = new JobID();
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID, "targetDirectory");
-                       ActorRef jm = actorSystem.actorOf(props);
-                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
 
                        String[] parameters = { "-s", "targetDirectory", 
jid.toString() };
-                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       CancelTestCliFrontend testFrontend = new 
CancelTestCliFrontend(false);
                        assertEquals(0, testFrontend.cancel(parameters));
+
+                       Mockito.verify(testFrontend.client, times(1))
+                               .cancelWithSavepoint(any(JobID.class), 
notNull(String.class));
                }
 
                {
                        // Cancel with savepoint (with target directory), but 
no job ID
-                       JobID jid = new JobID();
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID, "targetDirectory");
-                       ActorRef jm = actorSystem.actorOf(props);
-                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
-
                        String[] parameters = { "-s", "targetDirectory" };
-                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
                        assertNotEquals(0, testFrontend.cancel(parameters));
                }
 
                {
                        // Cancel with savepoint (no target directory) and no 
job ID
-                       JobID jid = new JobID();
-                       UUID leaderSessionID = UUID.randomUUID();
-
-                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID);
-                       ActorRef jm = actorSystem.actorOf(props);
-                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
-
                        String[] parameters = { "-s" };
-                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
                        assertNotEquals(0, testFrontend.cancel(parameters));
                }
        }
@@ -234,11 +220,32 @@ public class CliFrontendListCancelTest {
                }
        }
 
+       private static final class CancelTestCliFrontend extends CliFrontend {
+               private final ClusterClient client;
+
+               CancelTestCliFrontend(boolean reject) throws Exception {
+                       super(CliFrontendTestUtils.getConfigDir());
+                       this.client = mock(ClusterClient.class);
+                       if (reject) {
+                               doThrow(new IllegalArgumentException("Test 
exception")).when(client).cancel(any(JobID.class));
+                               doThrow(new IllegalArgumentException("Test 
exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
+                       }
+               }
+
+               @Override
+               public CustomCommandLine getActiveCustomCommandLine(CommandLine 
commandLine) {
+                       CustomCommandLine ccl = mock(CustomCommandLine.class);
+                       when(ccl.retrieveCluster(any(CommandLine.class), 
any(Configuration.class), anyString()))
+                               .thenReturn(client);
+                       return ccl;
+               }
+       }
+
        private static final class InfoListTestCliFrontend extends CliFrontend {
 
                private ActorGateway jobManagerGateway;
 
-               public InfoListTestCliFrontend(ActorGateway jobManagerGateway) 
throws Exception {
+               InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws 
Exception {
                        super(CliFrontendTestUtils.getConfigDir());
                        this.jobManagerGateway = jobManagerGateway;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index e453d37..0edc444 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client;
 
 import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -128,6 +129,14 @@ public class CliFrontendRunTest {
                                assertEquals("--arg2", 
options.getProgramArgs()[3]);
                                assertEquals("value2", 
options.getProgramArgs()[4]);
                        }
+
+                       // test flip6 switch
+                       {
+                               String[] parameters =
+                                       {"-flip6", getTestJarPath()};
+                               RunOptions options = 
CliFrontendParser.parseRunCommand(parameters);
+                               
assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index fef4880..ab81713 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -19,44 +19,37 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.cli.StopOptions;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
+import org.apache.commons.cli.CommandLine;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import java.util.UUID;
+import org.mockito.Mockito;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the STOP command.
  */
 public class CliFrontendStopTest extends TestLogger {
 
-       private static ActorSystem actorSystem;
-
        @BeforeClass
        public static void setup() {
                pipeSystemOutToNull();
-               actorSystem = ActorSystem.create("TestingActorSystem");
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(actorSystem);
-               actorSystem = null;
        }
 
        @Test
@@ -82,82 +75,53 @@ public class CliFrontendStopTest extends TestLogger {
                        JobID jid = new JobID();
                        String jidString = jid.toString();
 
-                       final UUID leaderSessionID = UUID.randomUUID();
-                       final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID));
-
-                       final ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
-
                        String[] parameters = { jidString };
-                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
+                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(false);
 
                        int retCode = testFrontend.stop(parameters);
-                       assertTrue(retCode == 0);
+                       assertEquals(0, retCode);
+
+                       Mockito.verify(testFrontend.client, 
times(1)).stop(any(JobID.class));
                }
 
                // test unknown job Id
                {
-                       JobID jid1 = new JobID();
-                       JobID jid2 = new JobID();
-
-                       final UUID leaderSessionID = UUID.randomUUID();
-                       final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID));
-
-                       final ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+                       JobID jid = new JobID();
 
-                       String[] parameters = { jid2.toString() };
-                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
+                       String[] parameters = { jid.toString() };
+                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(true);
 
                        assertTrue(testFrontend.stop(parameters) != 0);
-               }
-       }
 
-       private static final class StopTestCliFrontend extends CliFrontend {
-
-               private ActorGateway jobManagerGateway;
-
-               public StopTestCliFrontend(ActorGateway jobManagerGateway) 
throws Exception {
-                       super(CliFrontendTestUtils.getConfigDir());
-                       this.jobManagerGateway = jobManagerGateway;
+                       Mockito.verify(testFrontend.client, 
times(1)).stop(any(JobID.class));
                }
 
-               @Override
-               public ActorGateway getJobManagerGateway(CommandLineOptions 
options) {
-                       return jobManagerGateway;
+               // test flip6 switch
+               {
+                       String[] parameters =
+                               {"-flip6", String.valueOf(new JobID())};
+                       StopOptions options = 
CliFrontendParser.parseStopCommand(parameters);
+                       
assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
                }
        }
 
-       private static final class CliJobManager extends FlinkUntypedActor {
-               private final JobID jobID;
-               private final UUID leaderSessionID;
-
-               public CliJobManager(final JobID jobID, final UUID 
leaderSessionID) {
-                       this.jobID = jobID;
-                       this.leaderSessionID = leaderSessionID;
-               }
+       private static final class StopTestCliFrontend extends CliFrontend {
+               private final ClusterClient client;
 
-               @Override
-               public void handleMessage(Object message) {
-                       if (message instanceof 
JobManagerMessages.RequestTotalNumberOfSlots$) {
-                               getSender().tell(decorateMessage(1), getSelf());
-                       } else if (message instanceof 
JobManagerMessages.StopJob) {
-                               JobManagerMessages.StopJob stopJob = 
(JobManagerMessages.StopJob) message;
-
-                               if (jobID != null && 
jobID.equals(stopJob.jobID())) {
-                                       getSender().tell(decorateMessage(new 
Status.Success(new Object())), getSelf());
-                               } else {
-                                       getSender()
-                                                       
.tell(decorateMessage(new Status.Failure(new Exception(
-                                                                       "Wrong 
or no JobID"))), getSelf());
-                               }
-                       } else if (message instanceof 
JobManagerMessages.RequestRunningJobsStatus$) {
-                               getSender().tell(decorateMessage(new 
JobManagerMessages.RunningJobsStatus()),
-                                               getSelf());
+               StopTestCliFrontend(boolean reject) throws Exception {
+                       super(CliFrontendTestUtils.getConfigDir());
+                       this.client = mock(ClusterClient.class);
+                       if (reject) {
+                               doThrow(new IllegalArgumentException("Test 
exception")).when(client).stop(any(JobID.class));
                        }
                }
 
                @Override
-               protected UUID getLeaderSessionID() {
-                       return leaderSessionID;
+               public CustomCommandLine getActiveCustomCommandLine(CommandLine 
commandLine) {
+                       CustomCommandLine ccl = mock(CustomCommandLine.class);
+                       when(ccl.retrieveCluster(any(CommandLine.class), 
any(Configuration.class), anyString()))
+                               .thenReturn(client);
+                       return ccl;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 97a881c..98c7d26 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -18,12 +18,22 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import scala.concurrent.Future;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -51,4 +61,137 @@ public class ClusterClientTest extends TestLogger {
                verify(highAvailabilityServices, 
never()).closeAndCleanupAllData();
                verify(highAvailabilityServices).close();
        }
+
+       @Test
+       public void testClusterClientStop() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+               JobID jobID = new JobID();
+               TestStopActorGateway gateway = new TestStopActorGateway(jobID);
+               ClusterClient clusterClient = new TestClusterClient(config, 
gateway);
+               try {
+                       clusterClient.stop(jobID);
+                       Assert.assertTrue(gateway.messageArrived);
+               } finally {
+                       clusterClient.shutdown();
+               }
+       }
+
+       @Test
+       public void testClusterClientCancel() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+               JobID jobID = new JobID();
+               TestCancelActorGateway gateway = new 
TestCancelActorGateway(jobID);
+               ClusterClient clusterClient = new TestClusterClient(config, 
gateway);
+               try {
+                       clusterClient.cancel(jobID);
+                       Assert.assertTrue(gateway.messageArrived);
+               } finally {
+                       clusterClient.shutdown();
+               }
+       }
+
+       @Test
+       public void testClusterClientCancelWithSavepoint() throws Exception {
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+               JobID jobID = new JobID();
+               String savepointPath = "/test/path";
+               TestCancelWithSavepointActorGateway gateway = new 
TestCancelWithSavepointActorGateway(jobID, savepointPath);
+               ClusterClient clusterClient = new TestClusterClient(config, 
gateway);
+               try {
+                       clusterClient.cancelWithSavepoint(jobID, savepointPath);
+                       Assert.assertTrue(gateway.messageArrived);
+               } finally {
+                       clusterClient.shutdown();
+               }
+       }
+
+       private static class TestStopActorGateway extends DummyActorGateway {
+
+               private final JobID expectedJobID;
+               private volatile boolean messageArrived = false;
+
+               TestStopActorGateway(JobID expectedJobID) {
+                       this.expectedJobID = expectedJobID;
+               }
+
+               @Override
+               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
+                       messageArrived = true;
+                       if (message instanceof JobManagerMessages.StopJob) {
+                               JobManagerMessages.StopJob stopJob = 
(JobManagerMessages.StopJob) message;
+                               Assert.assertEquals(expectedJobID, 
stopJob.jobID());
+                               return Future$.MODULE$.successful(new 
JobManagerMessages.StoppingSuccess(stopJob.jobID()));
+                       }
+                       Assert.fail("Expected StopJob message, got: " + 
message.getClass());
+                       return null;
+               }
+       }
+
+       private static class TestCancelActorGateway extends DummyActorGateway {
+
+               private final JobID expectedJobID;
+               private volatile boolean messageArrived = false;
+
+               TestCancelActorGateway(JobID expectedJobID) {
+                       this.expectedJobID = expectedJobID;
+               }
+
+               @Override
+               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
+                       messageArrived = true;
+                       if (message instanceof JobManagerMessages.CancelJob) {
+                               JobManagerMessages.CancelJob cancelJob = 
(JobManagerMessages.CancelJob) message;
+                               Assert.assertEquals(expectedJobID, 
cancelJob.jobID());
+                               return Future$.MODULE$.successful(new 
JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
+                       }
+                       Assert.fail("Expected CancelJob message, got: " + 
message.getClass());
+                       return null;
+               }
+       }
+
+       private static class TestCancelWithSavepointActorGateway extends 
DummyActorGateway {
+
+               private final JobID expectedJobID;
+               private final String expectedTargetDirectory;
+               private volatile boolean messageArrived = false;
+
+               TestCancelWithSavepointActorGateway(JobID expectedJobID, String 
expectedTargetDirectory) {
+                       this.expectedJobID = expectedJobID;
+                       this.expectedTargetDirectory = expectedTargetDirectory;
+               }
+
+               @Override
+               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
+                       messageArrived = true;
+                       if (message instanceof 
JobManagerMessages.CancelJobWithSavepoint) {
+                               JobManagerMessages.CancelJobWithSavepoint 
cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
+                               Assert.assertEquals(expectedJobID, 
cancelJob.jobID());
+                               Assert.assertEquals(expectedTargetDirectory, 
cancelJob.savepointDirectory());
+                               return Future$.MODULE$.successful(new 
JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
+                       }
+                       Assert.fail("Expected CancelJobWithSavepoint message, 
got: " + message.getClass());
+                       return null;
+               }
+       }
+
+       private static class TestClusterClient extends StandaloneClusterClient {
+
+               private final ActorGateway jobmanagerGateway;
+
+               public TestClusterClient(Configuration config, ActorGateway 
jobmanagerGateway) throws Exception {
+                       super(config);
+                       this.jobmanagerGateway = jobmanagerGateway;
+               }
+
+               @Override
+               public ActorGateway getJobManagerGateway() {
+                       return jobmanagerGateway;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
new file mode 100644
index 0000000..617dd38
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+       private static final String restAddress = "http://localhost:1234";;
+       private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+       private static final GatewayRetriever<DispatcherGateway> 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+       static {
+               
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+               
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+       }
+
+       @Test
+       public void testJobSubmitCancelStop() throws Exception {
+
+               Configuration config = new Configuration();
+               config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+               RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+               TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+               TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+               TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+               RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+                       @Override
+                       protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
+
+                               Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>();
+                               
handlers.add(Tuple2.of(portHandler.getMessageHeaders(), portHandler));
+                               
handlers.add(Tuple2.of(submitHandler.getMessageHeaders(), submitHandler));
+                               
handlers.add(Tuple2.of(terminationHandler.getMessageHeaders(), 
terminationHandler));
+                               return handlers;
+                       }
+               };
+
+               RestClusterClient rcc = new RestClusterClient(config);
+               try {
+                       rse.start();
+
+                       JobGraph job = new JobGraph("testjob");
+                       JobID id = job.getJobID();
+
+                       Assert.assertFalse(portHandler.portRetrieved);
+                       Assert.assertFalse(submitHandler.jobSubmitted);
+                       rcc.submitJob(job, ClassLoader.getSystemClassLoader());
+                       Assert.assertTrue(portHandler.portRetrieved);
+                       Assert.assertTrue(submitHandler.jobSubmitted);
+
+                       Assert.assertFalse(terminationHandler.jobCanceled);
+                       rcc.cancel(id);
+                       Assert.assertTrue(terminationHandler.jobCanceled);
+
+                       Assert.assertFalse(terminationHandler.jobStopped);
+                       rcc.stop(id);
+                       Assert.assertTrue(terminationHandler.jobStopped);
+
+               } finally {
+                       rcc.shutdown();
+                       rse.shutdown(Time.seconds(5));
+               }
+       }
+
+       private static class TestBlobServerPortHandler extends 
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
+               private volatile boolean portRetrieved = false;
+
+               private TestBlobServerPortHandler() {
+                       super(
+                               CompletableFuture.completedFuture(restAddress),
+                               mockGatewayRetriever,
+                               RpcUtils.INF_TIMEOUT,
+                               BlobServerPortHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<BlobServerPortResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+                       portRetrieved = true;
+                       return CompletableFuture.completedFuture(new 
BlobServerPortResponseBody(12000));
+               }
+       }
+
+       private static class TestJobSubmitHandler extends 
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
+               private volatile boolean jobSubmitted = false;
+
+               private TestJobSubmitHandler() {
+                       super(
+                               CompletableFuture.completedFuture(restAddress),
+                               mockGatewayRetriever,
+                               RpcUtils.INF_TIMEOUT,
+                               JobSubmitHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+                       jobSubmitted = true;
+                       return CompletableFuture.completedFuture(new 
JobSubmitResponseBody("/url"));
+               }
+       }
+
+       private static class TestJobTerminationHandler extends 
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, EmptyResponseBody, 
JobTerminationMessageParameters> {
+               private volatile boolean jobCanceled = false;
+               private volatile boolean jobStopped = false;
+
+               private TestJobTerminationHandler() {
+                       super(
+                               CompletableFuture.completedFuture(restAddress),
+                               mockGatewayRetriever,
+                               RpcUtils.INF_TIMEOUT,
+                               JobTerminationHeaders.getInstance());
+               }
+
+               @Override
+               protected CompletableFuture<EmptyResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, 
JobTerminationMessageParameters> request, @Nonnull DispatcherGateway gateway) 
throws RestHandlerException {
+                       switch 
(request.getQueryParameter(TerminationModeQueryParameter.class).get(0)) {
+                               case CANCEL:
+                                       jobCanceled = true;
+                                       break;
+                               case STOP:
+                                       jobStopped = true;
+                                       break;
+                       }
+                       return 
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4d89dc8..bce2bed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -312,6 +312,11 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
        }
 
+       @Override
+       public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
+               return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
+       }
+
        /**
         * Cleans up the job related data from the dispatcher. If cleanupHA is 
true, then
         * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index fe7b91e..12cbbfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -71,4 +71,12 @@ public interface DispatcherGateway extends 
FencedRpcGateway<DispatcherId>, Restf
         * @return A future acknowledge if the stopping succeeded
         */
        CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time 
timeout);
+
+       /**
+        * Returns the port of the blob server.
+        *
+        * @param timeout of the operation
+        * @return A future integer of the blob server port
+        */
+       CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8471078..d64e649 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -27,7 +27,9 @@ import 
org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
@@ -192,6 +194,12 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), 
checkpointConfigHandler));
                
handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), 
checkpointStatisticsHandler));
 
+               BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
+               
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));
+
+               JobSubmitHandler jobSubmitHandler = new 
JobSubmitHandler(restAddressFuture, leaderRetriever, timeout);
+               handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
+
                // This handler MUST be added last, as it otherwise masks all 
subsequent GET handlers
                optWebContent.ifPresent(
                        webContent -> 
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index d09aad9..18766c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -54,6 +54,8 @@ import java.util.concurrent.TimeUnit;
  * An abstract class for netty-based REST server endpoints.
  */
 public abstract class RestServerEndpoint {
+
+       public static final int MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10;
        protected final Logger log = LoggerFactory.getLogger(getClass());
 
        private final Object lock = new Object();
@@ -120,7 +122,7 @@ public abstract class RestServerEndpoint {
 
                                        ch.pipeline()
                                                .addLast(new HttpServerCodec())
-                                               .addLast(new 
HttpObjectAggregator(1024 * 1024 * 10))
+                                               .addLast(new 
HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES))
                                                .addLast(handler.name(), 
handler)
                                                .addLast(new 
PipelineErrorHandler(log));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
new file mode 100644
index 0000000..cdf562f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs on.
+ */
+public final class BlobServerPortHandler extends 
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
+
+       public BlobServerPortHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time 
timeout) {
+               super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+       }
+
+       @Override
+       protected CompletableFuture<BlobServerPortResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+               return gateway
+                       .getBlobServerPort(timeout)
+                       .thenApply(BlobServerPortResponseBody::new)
+                       .exceptionally(error -> {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       "Failed to retrieve blob server port.",
+                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                       
ExceptionUtils.stripCompletionException(error)));
+                       });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
new file mode 100644
index 0000000..f810b5a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends 
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
+
+       public JobSubmitHandler(CompletableFuture<String> localRestAddress, 
GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) {
+               super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+       }
+
+       @Override
+       protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+               JobGraph jobGraph;
+               try {
+                       ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+                       jobGraph = (JobGraph) objectIn.readObject();
+               } catch (Exception e) {
+                       throw new RestHandlerException(
+                               "Failed to deserialize JobGraph.",
+                               HttpResponseStatus.BAD_REQUEST,
+                               e);
+               }
+
+               return gateway.submitJob(jobGraph, timeout)
+                       .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
new file mode 100644
index 0000000..8edec16
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for querying the port of the blob server.
+ */
+public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
+
+       private static final String URL = "/blobserver/port";
+       private static final BlobServerPortHeaders INSTANCE = new 
BlobServerPortHeaders();
+
+       private BlobServerPortHeaders() {
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       @Override
+       public Class<BlobServerPortResponseBody> getResponseClass() {
+               return BlobServerPortResponseBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public EmptyMessageParameters getUnresolvedMessageParameters() {
+               return EmptyMessageParameters.getInstance();
+       }
+
+       public static BlobServerPortHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
new file mode 100644
index 0000000..846475f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response containing the blob server port.
+ */
+public final class BlobServerPortResponseBody implements ResponseBody {
+
+       static final String FIELD_NAME_PORT = "port";
+
+       /**
+        * The port of the blob server.
+        */
+       @JsonProperty(FIELD_NAME_PORT)
+       public final int port;
+
+       @JsonCreator
+       public BlobServerPortResponseBody(
+               @JsonProperty(FIELD_NAME_PORT) int port) {
+
+               this.port = port;
+       }
+
+       @Override
+       public int hashCode() {
+               return 67 * port;
+       }
+
+       @Override
+       public boolean equals(Object object) {
+               if (object instanceof BlobServerPortResponseBody) {
+                       BlobServerPortResponseBody other = 
(BlobServerPortResponseBody) object;
+                       return this.port == other.port;
+               }
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
index fd87316..a59dc83 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
@@ -28,8 +28,8 @@ import java.util.Collections;
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-       private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-       private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+       public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+       public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
 
        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
new file mode 100644
index 0000000..6235214
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for submitting a job to a flink cluster.
+ */
+public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
+
+       private static final String URL = "/jobs";
+       private static final JobSubmitHeaders INSTANCE = new JobSubmitHeaders();
+
+       private JobSubmitHeaders() {
+       }
+
+       @Override
+       public Class<JobSubmitRequestBody> getRequestClass() {
+               return JobSubmitRequestBody.class;
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.POST;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       @Override
+       public Class<JobSubmitResponseBody> getResponseClass() {
+               return JobSubmitResponseBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.ACCEPTED;
+       }
+
+       @Override
+       public EmptyMessageParameters getUnresolvedMessageParameters() {
+               return EmptyMessageParameters.getInstance();
+       }
+
+       public static JobSubmitHeaders getInstance() {
+               return INSTANCE;
+       }
+}

Reply via email to