[FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs

- added JobType to JobGraph and ExecutionGraph
- added interface Stoppable, applied to SourceStreamTask
- added STOP signal logic to JobManager, TaskManager, ExecutionGraph
- extended Client to support stop
- extended Cli frontend, JobManager frontend
- updated documenation

Fix JobManagerTest.testStopSignal and testStopSignalFail

The StoppableInvokable could not be instantiated by Task because it was 
declared as a private
class. Adds additional checks to verify that the stop signal behaves correctly.

Auto-detect if job is stoppable

A job is stoppable iff all sources are stoppable

- Replace JobType by stoppable flag
- Add StoppableFunction and StoppableInvokable to support the optional stop 
operation
- added REST get/delete test (no extra YARN test -- think not required as 
get/delete is both tested)
- bug fix: job got canceld instead of stopped in web interface
- Add StoppingException
- Allow to stop jobs when they are not in state RUNNING

Second round of Till's comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd4024e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd4024e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd4024e

Branch: refs/heads/master
Commit: bdd4024e20fdfb0accb6121a68780ce3a0c218c0
Parents: 5eae47f
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Sat Sep 26 13:14:43 2015 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 15 16:16:13 2016 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                |   14 +
 .../org/apache/flink/client/CliFrontend.java    |   86 +-
 .../flink/client/cli/CliFrontendParser.java     |   41 +-
 .../apache/flink/client/cli/StopOptions.java    |   37 +
 .../org/apache/flink/client/program/Client.java |   59 +-
 .../client/program/ProgramStopException.java    |   53 +
 .../flink/client/CliFrontendStopTest.java       |  159 ++
 .../org/apache/flink/storm/api/FlinkClient.java |    6 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |   13 +-
 .../api/common/functions/StoppableFunction.java |   33 +
 flink-runtime-web/pom.xml                       |   14 +
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   11 +-
 .../handlers/JobCancellationHandler.java        |    3 +
 .../webmonitor/handlers/JobDetailsHandler.java  |    1 +
 .../webmonitor/handlers/JobStoppingHandler.java |   49 +
 .../webmonitor/testutils/HttpTestClient.java    |   19 +
 .../web-dashboard/app/partials/jobs/job.jade    |    4 +
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   |    5 +
 .../app/scripts/modules/jobs/jobs.svc.coffee    |    5 +
 .../web-dashboard/web/css/vendor.css            |    1 +
 .../web-dashboard/web/js/vendor.js              | 2053 +++++++++++-------
 .../web-dashboard/web/partials/jobs/job.html    |    1 +
 .../apache/flink/runtime/StoppingException.java |   35 +
 .../flink/runtime/executiongraph/Execution.java |   93 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   93 +-
 .../runtime/executiongraph/ExecutionVertex.java |    5 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |    1 +
 .../flink/runtime/jobgraph/JobVertex.java       |   85 +-
 .../runtime/jobgraph/tasks/StoppableTask.java   |   25 +
 .../apache/flink/runtime/taskmanager/Task.java  |   60 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   27 +
 .../runtime/messages/JobManagerMessages.scala   |   25 +
 .../runtime/messages/TaskControlMessages.scala  |    9 +
 .../flink/runtime/taskmanager/TaskManager.scala |   22 +
 .../ExecutionGraphConstructionTest.java         |   97 +-
 .../ExecutionGraphDeploymentTest.java           |   27 +-
 .../ExecutionGraphSignalsTest.java              |  224 ++
 .../executiongraph/ExecutionGraphTestUtils.java |   12 +-
 .../ExecutionStateProgressTest.java             |  173 +-
 .../executiongraph/ExecutionVertexStopTest.java |  132 ++
 .../executiongraph/LocalInputSplitsTest.java    |   25 +-
 .../executiongraph/PointwisePatternTest.java    |   84 +-
 .../runtime/jobmanager/JobManagerTest.java      |  107 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  131 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |  114 +
 .../runtime/testutils/StoppableInvokable.java   |   32 +
 .../environment/StreamExecutionEnvironment.java |    9 +-
 .../flink/streaming/api/graph/StreamGraph.java  |    7 +-
 .../api/operators/StoppableStreamSource.java    |   51 +
 .../tasks/StoppableSourceStreamTask.java        |   33 +
 .../runtime/tasks/SourceStreamTaskTest.java     |   53 +-
 flink-tests/pom.xml                             |   16 +
 .../flink/test/web/WebFrontendITCase.java       |   89 +-
 53 files changed, 3389 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index c9145bc..421ed94 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -108,6 +108,10 @@ The command line can be used to
 
         ./bin/flink cancel <jobID>
 
+-   Stop a job (streaming jobs only):
+
+        ./bin/flink stop <jobID>
+
 ### Savepoints
 
 [Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled 
via the command line client:
@@ -248,6 +252,16 @@ Action "cancel" cancels a running program.
                                    configuration.
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] <Job ID>
+  "stop" action options:
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
+
+
 Action "savepoint" triggers savepoints for a running job or disposes existing 
ones.
 
   Syntax: savepoint [OPTIONS] <Job ID>

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index da91bca..98bf056 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -36,6 +36,7 @@ import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.cli.SavepointOptions;
+import org.apache.flink.client.cli.StopOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -56,7 +57,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -110,6 +114,7 @@ public class CliFrontend {
        public static final String ACTION_INFO = "info";
        private static final String ACTION_LIST = "list";
        private static final String ACTION_CANCEL = "cancel";
+       private static final String ACTION_STOP = "stop";
        private static final String ACTION_SAVEPOINT = "savepoint";
 
        // config dir parameters
@@ -290,9 +295,6 @@ public class CliFrontend {
                catch (FileNotFoundException e) {
                        return handleArgException(e);
                }
-               catch (ProgramInvocationException e) {
-                       return handleError(e);
-               }
                catch (Throwable t) {
                        return handleError(t);
                }
@@ -362,7 +364,7 @@ public class CliFrontend {
        /**
         * Executes the info action.
         * 
-        * @param args Command line arguments for the info action. 
+        * @param args Command line arguments for the info action.
         */
        protected int info(String[] args) {
                LOG.info("Running 'info' command.");
@@ -568,6 +570,65 @@ public class CliFrontend {
        }
 
        /**
+        * Executes the STOP action.
+        * 
+        * @param args Command line arguments for the stop action.
+        */
+       protected int stop(String[] args) {
+               LOG.info("Running 'stop' command.");
+
+               StopOptions options;
+               try {
+                       options = CliFrontendParser.parseStopCommand(args);
+               }
+               catch (CliArgsException e) {
+                       return handleArgException(e);
+               }
+               catch (Throwable t) {
+                       return handleError(t);
+               }
+
+               // evaluate help flag
+               if (options.isPrintHelp()) {
+                       CliFrontendParser.printHelpForStop();
+                       return 0;
+               }
+
+               String[] stopArgs = options.getArgs();
+               JobID jobId;
+
+               if (stopArgs.length > 0) {
+                       String jobIdString = stopArgs[0];
+                       try {
+                               jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+                       }
+                       catch (Exception e) {
+                               return handleError(e);
+                       }
+               }
+               else {
+                       return handleArgException(new CliArgsException("Missing 
JobID"));
+               }
+
+               try {
+                       ActorGateway jobManager = getJobManagerGateway(options);
+                       Future<Object> response = jobManager.ask(new 
StopJob(jobId), clientTimeout);
+
+                       final Object rc = Await.result(response, clientTimeout);
+
+                       if (rc instanceof StoppingFailure) {
+                               throw new Exception("Stopping the job with ID " 
+ jobId + " failed.",
+                                               ((StoppingFailure) rc).cause());
+                       }
+
+                       return 0;
+               }
+               catch (Throwable t) {
+                       return handleError(t);
+               }
+       }
+
+       /**
         * Executes the CANCEL action.
         * 
         * @param args Command line arguments for the cancel action.
@@ -616,13 +677,14 @@ public class CliFrontend {
                        ActorGateway jobManager = getJobManagerGateway(options);
                        Future<Object> response = jobManager.ask(new 
CancelJob(jobId), clientTimeout);
 
-                       try {
-                               Await.result(response, clientTimeout);
-                               return 0;
-                       }
-                       catch (Exception e) {
-                               throw new Exception("Canceling the job with ID 
" + jobId + " failed.", e);
+                       final Object rc = Await.result(response, clientTimeout);
+
+                       if (rc instanceof CancellationFailure) {
+                               throw new Exception("Canceling the job with ID 
" + jobId + " failed.",
+                                               ((CancellationFailure) 
rc).cause());
                        }
+
+                       return 0;
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -1123,6 +1185,8 @@ public class CliFrontend {
                                return info(params);
                        case ACTION_CANCEL:
                                return cancel(params);
+                       case ACTION_STOP:
+                               return stop(params);
                        case ACTION_SAVEPOINT:
                                return savepoint(params);
                        case "-h":
@@ -1139,7 +1203,7 @@ public class CliFrontend {
                        default:
                                System.out.printf("\"%s\" is not a valid 
action.\n", action);
                                System.out.println();
-                               System.out.println("Valid actions are \"run\", 
\"list\", \"info\", or \"cancel\".");
+                               System.out.println("Valid actions are \"run\", 
\"list\", \"info\", \"stop\", or \"cancel\".");
                                System.out.println();
                                System.out.println("Specify the version option 
(-v or --version) to print Flink version.");
                                System.out.println();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 07d409e..2ac53d2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -39,13 +39,13 @@ public class CliFrontendParser {
 
 
        static final Option HELP_OPTION = new Option("h", "help", false,
-                                                                               
                "Show the help message for the CLI Frontend or the action.");
+                       "Show the help message for the CLI Frontend or the 
action.");
 
        static final Option JAR_OPTION = new Option("j", "jarfile", true, 
"Flink program JAR file.");
 
        public static final Option CLASS_OPTION = new Option("c", "class", true,
                        "Class with the program entry point (\"main\" method or 
\"getPlan()\" method. Only needed if the " +
-                                       "JAR file does not specify the class in 
its manifest.");
+                       "JAR file does not specify the class in its manifest.");
 
        static final Option CLASSPATH_OPTION = new Option("C", "classpath", 
true, "Adds a URL to each user code " +
                        "classloader  on all nodes in the cluster. The paths 
must specify a protocol (e.g. file://) and be " +
@@ -55,7 +55,7 @@ public class CliFrontendParser {
 
        static final Option PARALLELISM_OPTION = new Option("p", "parallelism", 
true,
                        "The parallelism with which to run the program. 
Optional flag to override the default value " +
-                                       "specified in the configuration.");
+                       "specified in the configuration.");
 
        static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", 
false, "If present, " +
                        "supress logging output to standard out.");
@@ -67,9 +67,9 @@ public class CliFrontendParser {
                        "Program arguments. Arguments can also be added without 
-a, simply as trailing parameters.");
 
        static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
-                       "Address of the JobManager (master) to which to 
connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
-                                       + "' as the JobManager to deploy a YARN 
cluster for the job. Use this flag to connect to a " +
-                                       "different JobManager than the one 
specified in the configuration.");
+                       "Address of the JobManager (master) to which to 
connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
+                       "' as the JobManager to deploy a YARN cluster for the 
job. Use this flag to connect to a " +
+                       "different JobManager than the one specified in the 
configuration.");
 
        static final Option SAVEPOINT_PATH_OPTION = new Option("s", 
"fromSavepoint", true,
                        "Path to a savepoint to reset the job back to (for 
example file:///flink/savepoint-1537).");
@@ -123,6 +123,7 @@ public class CliFrontendParser {
        private static final Options INFO_OPTIONS = 
getInfoOptions(buildGeneralOptions(new Options()));
        private static final Options LIST_OPTIONS = 
getListOptions(buildGeneralOptions(new Options()));
        private static final Options CANCEL_OPTIONS = 
getCancelOptions(buildGeneralOptions(new Options()));
+       private static final Options STOP_OPTIONS = 
getStopOptions(buildGeneralOptions(new Options()));
        private static final Options SAVEPOINT_OPTIONS = 
getSavepointOptions(buildGeneralOptions(new Options()));
 
        private static Options buildGeneralOptions(Options options) {
@@ -197,6 +198,11 @@ public class CliFrontendParser {
                return options;
        }
 
+       private static Options getStopOptions(Options options) {
+               options = getJobManagerAddressOption(options);
+               return options;
+       }
+
        private static Options getSavepointOptions(Options options) {
                options = getJobManagerAddressOption(options);
                options.addOption(SAVEPOINT_DISPOSE_OPTION);
@@ -218,6 +224,7 @@ public class CliFrontendParser {
                printHelpForRun();
                printHelpForInfo();
                printHelpForList();
+               printHelpForStop();
                printHelpForCancel();
                printHelpForSavepoint();
 
@@ -264,6 +271,18 @@ public class CliFrontendParser {
                System.out.println();
        }
 
+       public static void printHelpForStop() {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"stop\" stops a running program 
(streaming jobs only).");
+               System.out.println("\n  Syntax: stop [OPTIONS] <Job ID>");
+               formatter.setSyntaxPrefix("  \"stop\" action options:");
+               formatter.printHelp(" ", getStopOptions(new Options()));
+               System.out.println();
+       }
+
        public static void printHelpForCancel() {
                HelpFormatter formatter = new HelpFormatter();
                formatter.setLeftPadding(5);
@@ -325,6 +344,16 @@ public class CliFrontendParser {
                }
        }
 
+       public static StopOptions parseStopCommand(String[] args) throws 
CliArgsException {
+               try {
+                       PosixParser parser = new PosixParser();
+                       CommandLine line = parser.parse(STOP_OPTIONS, args, 
false);
+                       return new StopOptions(line);
+               } catch (ParseException e) {
+                       throw new CliArgsException(e.getMessage());
+               }
+       }
+
        public static SavepointOptions parseSavepointCommand(String[] args) 
throws CliArgsException {
                try {
                        PosixParser parser = new PosixParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
new file mode 100644
index 0000000..7f246c8
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the STOP command
+ */
+public class StopOptions extends CommandLineOptions {
+
+       private final String[] args;
+
+       public StopOptions(CommandLine line) {
+               super(line);
+               this.args = line.getArgs();
+       }
+
+       public String[] getArgs() {
+               return args == null ? new String[0] : args;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 452710c..999b461 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -29,7 +29,6 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.CompilerException;
@@ -43,16 +42,17 @@ import 
org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -413,25 +413,60 @@ public class Client {
         * @throws Exception In case an error occurred.
         */
        public void cancel(JobID jobId) throws Exception {
-               ActorGateway jobManagerGateway = getJobManagerGateway();
+               final ActorGateway jobManagerGateway = getJobManagerGateway();
 
-               Future<Object> response;
+               final Future<Object> response;
                try {
                        response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
-               } catch (Exception e) {
+               } catch (final Exception e) {
                        throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
                }
 
-               Object result = Await.result(response, timeout);
+               final Object result = Await.result(response, timeout);
 
                if (result instanceof JobManagerMessages.CancellationSuccess) {
-                       LOG.debug("Job cancellation with ID " + jobId + " 
succeeded.");
+                       LOG.info("Job cancellation with ID " + jobId + " 
succeeded.");
                } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
-                       Throwable t = ((JobManagerMessages.CancellationFailure) 
result).cause();
-                       LOG.debug("Job cancellation with ID " + jobId + " 
failed.", t);
+                       final Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
+                       LOG.info("Job cancellation with ID " + jobId + " 
failed.", t);
                        throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
                } else {
-                       throw new Exception("Unknown message received while 
cancelling.");
+                       throw new Exception("Unknown message received while 
cancelling: " + result.getClass().getName());
+               }
+       }
+
+       /**
+        * Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
+        * Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
+        * a while after sending the stop command, because after sources 
stopped to emit data all operators
+        * need to finish processing.
+        * 
+        * @param jobId
+        *            the job ID of the streaming program to stop
+        * @throws ProgramStopException
+        *             If the job ID is invalid (ie, is unknown or refers to a 
batch job) or if sending the stop signal
+        *             failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
+        */
+       public void stop(final JobID jobId) throws Exception {
+               final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+               final Future<Object> response;
+               try {
+                       response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
+               } catch (final Exception e) {
+                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
+               }
+
+               final Object result = Await.result(response, timeout);
+
+               if (result instanceof JobManagerMessages.StoppingSuccess) {
+                       LOG.info("Job stopping with ID " + jobId + " 
succeeded.");
+               } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
+                       final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
+                       LOG.info("Job stopping with ID " + jobId + " failed.", 
t);
+                       throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
+               } else {
+                       throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
                }
        }
 
@@ -482,7 +517,7 @@ public class Client {
        // 
------------------------------------------------------------------------
        //  Sessions
        // 
------------------------------------------------------------------------
-       
+
        /**
         * Tells the JobManager to finish the session (job) defined by the 
given ID.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
new file mode 100644
index 0000000..a1d8a9b
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+/**
+ * Exception used to indicate that there is an error during stopping of a 
Flink program.
+ */
+public class ProgramStopException extends Exception {
+       private static final long serialVersionUID = -906791331829704450L;
+
+       /**
+        * Creates a <tt>ProgramStopException</tt> with the given message.
+        * 
+        * @param message
+        *            The message for the exception.
+        */
+       public ProgramStopException(String message) {
+               super(message);
+       }
+
+       /**
+        * Creates a <tt>ProgramStopException</tt> for the given exception.
+        * 
+        * @param cause
+        *            The exception that causes the program invocation to fail.
+        */
+       public ProgramStopException(Throwable cause) {
+               super(cause);
+       }
+
+       /**
+        * Creates a <tt>ProgramStopException</tt> for the given exception with 
an additional message.
+        * 
+        * @param message
+        *            The additional message.
+        * @param cause
+        *            The exception that causes the program invocation to fail.
+        */
+       public ProgramStopException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
new file mode 100644
index 0000000..7c34c75
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static 
org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest extends TestLogger {
+
+       private static ActorSystem actorSystem;
+
+       @BeforeClass
+       public static void setup() {
+               pipeSystemOutToNull();
+               clearGlobalConfiguration();
+               actorSystem = ActorSystem.create("TestingActorSystem");
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(actorSystem);
+               actorSystem = null;
+       }
+
+       @Test
+       public void testStop() throws Exception {
+               // test unrecognized option
+               {
+                       String[] parameters = { "-v", "-l" };
+                       CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+                       int retCode = testFrontend.stop(parameters);
+                       assertTrue(retCode != 0);
+               }
+
+               // test missing job id
+               {
+                       String[] parameters = {};
+                       CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+                       int retCode = testFrontend.stop(parameters);
+                       assertTrue(retCode != 0);
+               }
+
+               // test stop properly
+               {
+                       JobID jid = new JobID();
+                       String jidString = jid.toString();
+
+                       final UUID leaderSessionID = UUID.randomUUID();
+                       final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID));
+
+                       final ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { jidString };
+                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
+
+                       int retCode = testFrontend.stop(parameters);
+                       assertTrue(retCode == 0);
+               }
+
+               // test unknown job Id
+               {
+                       JobID jid1 = new JobID();
+                       JobID jid2 = new JobID();
+
+                       final UUID leaderSessionID = UUID.randomUUID();
+                       final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID));
+
+                       final ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { jid2.toString() };
+                       StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
+
+                       assertTrue(testFrontend.stop(parameters) != 0);
+               }
+       }
+
+       protected static final class StopTestCliFrontend extends CliFrontend {
+
+               private ActorGateway jobManagerGateway;
+
+               public StopTestCliFrontend(ActorGateway jobManagerGateway) 
throws Exception {
+                       super(CliFrontendTestUtils.getConfigDir());
+                       this.jobManagerGateway = jobManagerGateway;
+               }
+
+               @Override
+               public ActorGateway getJobManagerGateway(CommandLineOptions 
options) {
+                       return jobManagerGateway;
+               }
+       }
+
+       protected static final class CliJobManager extends FlinkUntypedActor {
+               private final JobID jobID;
+               private final UUID leaderSessionID;
+
+               public CliJobManager(final JobID jobID, final UUID 
leaderSessionID) {
+                       this.jobID = jobID;
+                       this.leaderSessionID = leaderSessionID;
+               }
+
+               @Override
+               public void handleMessage(Object message) {
+                       if (message instanceof 
JobManagerMessages.RequestTotalNumberOfSlots$) {
+                               getSender().tell(decorateMessage(1), getSelf());
+                       } else if (message instanceof 
JobManagerMessages.StopJob) {
+                               JobManagerMessages.StopJob stopJob = 
(JobManagerMessages.StopJob) message;
+
+                               if (jobID != null && 
jobID.equals(stopJob.jobID())) {
+                                       getSender().tell(decorateMessage(new 
Status.Success(new Object())), getSelf());
+                               } else {
+                                       getSender()
+                                                       
.tell(decorateMessage(new Status.Failure(new Exception(
+                                                                       "Wrong 
or no JobID"))), getSelf());
+                               }
+                       } else if (message instanceof 
JobManagerMessages.RequestRunningJobsStatus$) {
+                               getSender().tell(decorateMessage(new 
JobManagerMessages.RunningJobsStatus()),
+                                               getSelf());
+                       }
+               }
+
+               @Override
+               protected UUID getLeaderSessionID() {
+                       return leaderSessionID;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 767f1b1..2541345 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -208,7 +208,7 @@ public class FlinkClient {
                final Client client;
                try {
                        client = new Client(configuration);
-               } catch (IOException e) {
+               } catch (final IOException e) {
                        throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }
 
@@ -253,7 +253,7 @@ public class FlinkClient {
                }
 
                try {
-                       client.cancel(jobId);
+                       client.stop(jobId);
                } catch (final Exception e) {
                        throw new RuntimeException("Cannot stop job.", e);
                }
@@ -282,7 +282,7 @@ public class FlinkClient {
                        final Future<Object> response = 
Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
                                        new Timeout(askTimeout));
 
-                       Object result;
+                       final Object result;
                        try {
                                result = Await.result(response, askTimeout);
                        } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index e077aeb..66b05c6 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.IRichSpout;
 import com.google.common.collect.Sets;
 
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
@@ -52,7 +53,7 @@ import java.util.HashMap;
  * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() 
nextTuple()} method until
  * {@link FiniteSpout#reachedEnd()} returns true.
  */
-public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
+public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> 
implements StoppableFunction {
        private static final long serialVersionUID = -218340336648247605L;
 
        /** Number of attributes of the spouts's output tuples per stream. */
@@ -299,6 +300,16 @@ public final class SpoutWrapper<OUT> extends 
RichParallelSourceFunction<OUT> {
                this.isRunning = false;
        }
 
+       /**
+        * {@inheritDoc}
+        * <p>
+        * Sets the {@link #isRunning} flag to {@code false}.
+        */
+       @Override
+       public void stop() {
+               this.isRunning = false;
+       }
+
        @Override
        public void close() throws Exception {
                this.spout.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
new file mode 100644
index 0000000..a83b73f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.api.common.functions;
+
+/**
+ * Must be implemented by stoppable functions, eg, source functions of 
streaming jobs. The method {@link #stop()} will
+ * be called when the job received the STOP signal. On this signal, the source 
function must stop emitting new data and
+ * terminate gracefully.
+ */
+public interface StoppableFunction {
+       /**
+        * Stops the source. In contrast to {@code cancel()} this is a request 
to the source function to shut down
+        * gracefully. Pending data can still be emitted and it is not required 
to stop immediately -- however, in the near
+        * future. The job will keep running until all emitted data is 
processed completely.
+        * <p>
+        * Most streaming sources will have a while loop inside the {@code 
run()} method. You need to ensure that the source
+        * will break out of this loop. This can be achieved by having a 
volatile field "isRunning" that is checked in the
+        * loop and that is set to false in this method.
+        * <p>
+        * <strong>The call to {@code stop()} should not block and not throw 
any exception.</strong>
+        */
+       public void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 1a19fb1..27bbbd5 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -44,6 +44,20 @@ under the License.
                                </includes>
                        </resource>
                </resources>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <inherited>true</inherited>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
        </build>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0b5de1f..67c0dab 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
@@ -245,8 +246,14 @@ public class WebRuntimeMonitor implements WebMonitor {
                        // Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
                        .GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
 
-                       // DELETE is the preferred way of cancelling a job 
(Rest-conform)
-                       .DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()));
+                       // DELETE is the preferred way of canceling a job 
(Rest-conform)
+                       .DELETE("/jobs/:jobid/cancel", handler(new 
JobCancellationHandler()))
+
+                       // stop a job via GET (for proper integration with YARN 
this has to be performed via GET)
+                       .GET("/jobs/:jobid/yarn-stop", handler(new 
JobStoppingHandler()))
+
+                       // DELETE is the preferred way of stopping a job 
(Rest-conform)
+                       .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()));
 
                if (webSubmitAllow) {
                        router

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index aae8b34..b17acdc 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -25,6 +25,9 @@ import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
 
+/**
+ * Request handler for the CANCEL request.
+ */
 public class JobCancellationHandler implements RequestHandler {
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 4511c17..4f31128 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -61,6 +61,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                // basic info
                gen.writeStringField("jid", graph.getJobID().toString());
                gen.writeStringField("name", graph.getJobName());
+               gen.writeBooleanField("isStoppable", graph.isStoppable());
                gen.writeStringField("state", graph.getState().name());
                
                // times and duration

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
new file mode 100644
index 0000000..791790a
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+
+/**
+ * Request handler for the STOP request.
+ */
+public class JobStoppingHandler implements RequestHandler {
+
+       @Override
+       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+               try {
+                       JobID jobid = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                       if (jobManager != null) {
+                               jobManager.tell(new 
JobManagerMessages.StopJob(jobid));
+                               return "{}";
+                       }
+                       else {
+                               throw new Exception("No connection to the 
leading JobManager.");
+                       }
+               }
+               catch (Exception e) {
+                       throw new Exception("Failed to stop the job with id: "  
+ pathParams.get("jobid") + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d7d4457..9a396d3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -165,6 +165,25 @@ public class HttpTestClient implements AutoCloseable {
        }
 
        /**
+        * Sends a simple DELETE request to the given path. You only specify 
the $path part of
+        * http://$host:$host/$path.
+        *
+        * @param path The $path to DELETE (http://$host:$host/$path)
+        */
+       public void sendDeleteRequest(String path, FiniteDuration timeout) 
throws TimeoutException, InterruptedException {
+               if (!path.startsWith("/")) {
+                       path = "/" + path;
+               }
+
+               HttpRequest getRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+                               HttpMethod.DELETE, path);
+               getRequest.headers().set(HttpHeaders.Names.HOST, host);
+               getRequest.headers().set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+
+               sendRequest(getRequest, timeout);
+       }
+
+       /**
         * Returns the next available HTTP response. A call to this method 
blocks until a response
         * becomes available.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
index 5b541ae..fe3e0fc 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
     span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
       | Cancel
 
+  .navbar-info.last.first(ng-if!="job.isStoppable && (job.state=='CREATED' || 
job.state=='RUNNING' || job.state=='RESTARTING')")
+    span.navbar-info-button.btn.btn-default(ng-click="stopJob($event)")
+      | Stop
+
 nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job")
   ul.nav.nav-tabs
     li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index f0ce892..931976d 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -80,6 +80,11 @@ angular.module('flinkApp')
     JobsService.cancelJob($stateParams.jobid).then (data) ->
       {}
 
+  $scope.stopJob = (stopEvent) ->
+    
angular.element(stopEvent.currentTarget).removeClass("btn").removeClass("btn-default").html('Stopping...')
+    JobsService.stopJob($stateParams.jobid).then (data) ->
+      {}
+
   $scope.toggleHistory = ->
     $scope.showHistory = !$scope.showHistory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
index 65ae5cb..71f0921 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
@@ -282,4 +282,9 @@ angular.module('flinkApp')
     # proper "DELETE jobs/<jobid>/"
     $http.get flinkConfig.jobServer + "jobs/" + jobid + "/yarn-cancel"
 
+  @stopJob = (jobid) ->
+    # uses the non REST-compliant GET yarn-cancel handler which is available 
in addition to the
+    # proper "DELETE jobs/<jobid>/"
+    $http.get "jobs/" + jobid + "/yarn-stop"
+
   @

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/css/vendor.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/vendor.css 
b/flink-runtime-web/web-dashboard/web/css/vendor.css
index 2a8d00f..e0c9259 100644
--- a/flink-runtime-web/web-dashboard/web/css/vendor.css
+++ b/flink-runtime-web/web-dashboard/web/css/vendor.css
@@ -5902,6 +5902,7 @@ button.close {
 .modal-header {
   padding: 15px;
   border-bottom: 1px solid #e5e5e5;
+  min-height: 16.42857143px;
 }
 .modal-header .close {
   margin-top: -2px;

Reply via email to