Repository: flink
Updated Branches:
  refs/heads/master fd410d9f6 -> 5783671c2


[FLINK-4717] Add CancelJobWithSavepoint

- Adds CancelJobWithSavepoint message, which triggers a savepoint
  before cancelling the respective job.
- Adds -s [targetDirectory] option to CLI cancel command:
    * bin/flink cancel <jobID> (regular cancelling)
    * bin/flink cancel -s <jobID> (cancel with savepoint to default dir)
    * bin/flink cancel -s <targetDir> <jobID> (cancel with savepoint to 
targetDir)

This closes #2609.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5783671c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5783671c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5783671c

Branch: refs/heads/master
Commit: 5783671c2f30228a2d5b5b7bf09b762ae41db8e2
Parents: fd410d9
Author: Ufuk Celebi <u...@apache.org>
Authored: Fri Oct 7 11:48:47 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Fri Oct 14 10:07:09 2016 +0200

----------------------------------------------------------------------
 docs/setup/cli.md                               | 192 ++++++++++---------
 .../org/apache/flink/client/CliFrontend.java    |  55 +++++-
 .../apache/flink/client/cli/CancelOptions.java  |  18 ++
 .../flink/client/cli/CliFrontendParser.java     |  14 +-
 .../flink/client/CliFrontendListCancelTest.java | 106 +++++++++-
 .../checkpoint/CheckpointCoordinator.java       |  26 ++-
 .../checkpoint/CheckpointDeclineReason.java     |   2 +
 .../runtime/checkpoint/PendingCheckpoint.java   |   9 +
 .../flink/runtime/jobmanager/JobManager.scala   |  56 ++++++
 .../runtime/messages/JobManagerMessages.scala   |  19 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  89 ++++++---
 .../checkpoint/CheckpointStateRestoreTest.java  |   4 +-
 .../checkpoint/PendingCheckpointTest.java       |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  11 +-
 .../runtime/jobmanager/JobManagerTest.java      | 142 ++++++++++++++
 .../test/checkpointing/SavepointITCase.java     |   4 +
 16 files changed, 607 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/docs/setup/cli.md
----------------------------------------------------------------------
diff --git a/docs/setup/cli.md b/docs/setup/cli.md
index 251a0f6..0d3045e 100644
--- a/docs/setup/cli.md
+++ b/docs/setup/cli.md
@@ -113,6 +113,10 @@ The command line can be used to
 
         ./bin/flink cancel <jobID>
 
+-   Cancel a job with a savepoint:
+
+        ./bin/flink cancel -s [targetDirectory] <jobID>
+
 -   Stop a job (streaming jobs only):
 
         ./bin/flink stop <jobID>
@@ -144,7 +148,19 @@ Returns the path of the created savepoint. You need this 
path to restore and dis
 
 You can optionally specify a `savepointDirectory` when triggering the 
savepoint. If you don't specify one here, you need to configure a default 
savepoint directory for the Flink installation (see 
[[savepoint.html#configuration]]).
 
-#### **Restore a savepoint**
+##### Cancel with a savepoint
+
+You can atomically trigger a savepoint and cancel a job.
+
+{% highlight bash %}
+./bin/flink cancel -s  [savepointDirectory] <jobID>
+{% endhighlight %}
+
+If no savepoint directory is configured, you need to configure a default 
savepoint directory for the Flink installation (see 
[[savepoint.html#configuration]]).
+
+The job will only be cancelled if the savepoint succeeds.
+
+#### Restore a savepoint
 
 {% highlight bash %}
 ./bin/flink run -s <savepointPath> ...
@@ -152,7 +168,7 @@ You can optionally specify a `savepointDirectory` when 
triggering the savepoint.
 
 The run command has a savepoint flag to submit a job, which restores its state 
from a savepoint. The savepoint path is returned by the savepoint trigger 
command.
 
-#### **Dispose a savepoint**
+#### Dispose a savepoint
 
 {% highlight bash %}
 ./bin/flink savepoint -d <savepointPath>
@@ -181,41 +197,51 @@ Action "run" compiles and runs a program.
 
   Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action options:
-     -c,--class <classname>               Class with the program entry point
-                                          ("main" method or "getPlan()" method.
-                                          Only needed if the JAR file does not
-                                          specify the class in its manifest.
-     -C,--classpath <url>                 Adds a URL to each user code
-                                          classloader  on all nodes in the
-                                          cluster. The paths must specify a
-                                          protocol (e.g. file://) and be
-                                          accessible on all nodes (e.g. by 
means
-                                          of a NFS share). You can use this
-                                          option multiple times for specifying
-                                          more than one URL. The protocol must
-                                          be supported by the {@link
-                                          java.net.URLClassLoader}.
-     -d,--detached                        If present, runs the job in detached
-                                          mode
-     -m,--jobmanager <host:port>          Address of the JobManager (master) to
-                                          which to connect. Specify
-                                          'yarn-cluster' as the JobManager to
-                                          deploy a YARN cluster for the job. 
Use
-                                          this flag to connect to a different
-                                          JobManager than the one specified in
-                                          the configuration.
-     -p,--parallelism <parallelism>       The parallelism with which to run the
-                                          program. Optional flag to override 
the
-                                          default value specified in the
-                                          configuration.
-     -q,--sysoutLogging                   If present, supress logging output to
-                                          standard out.
-     -s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job
-                                          back to (for example
-                                          file:///flink/savepoint-1537).
-  Additional arguments if -m yarn-cluster is set:
+     -c,--class <classname>                         Class with the program 
entry
+                                                    point ("main" method or
+                                                    "getPlan()" method. Only
+                                                    needed if the JAR file does
+                                                    not specify the class in 
its
+                                                    manifest.
+     -C,--classpath <url>                           Adds a URL to each user 
code
+                                                    classloader  on all nodes 
in
+                                                    the cluster. The paths must
+                                                    specify a protocol (e.g.
+                                                    file://) and be accessible
+                                                    on all nodes (e.g. by means
+                                                    of a NFS share). You can 
use
+                                                    this option multiple times
+                                                    for specifying more than 
one
+                                                    URL. The protocol must be
+                                                    supported by the {@link
+                                                    java.net.URLClassLoader}.
+     -d,--detached                                  If present, runs the job in
+                                                    detached mode
+     -m,--jobmanager <host:port>                    Address of the JobManager
+                                                    (master) to which to
+                                                    connect. Use this flag to
+                                                    connect to a different
+                                                    JobManager than the one
+                                                    specified in the
+                                                    configuration.
+     -p,--parallelism <parallelism>                 The parallelism with which
+                                                    to run the program. 
Optional
+                                                    flag to override the 
default
+                                                    value specified in the
+                                                    configuration.
+     -q,--sysoutLogging                             If present, suppress 
logging
+                                                    output to standard out.
+     -s,--fromSavepoint <savepointPath>             Path to a savepoint to 
reset
+                                                    the job back to (for 
example
+                                                    
file:///flink/savepoint-1537
+                                                    ).
+     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
+                                                    Zookeeper sub-paths for 
high
+                                                    availability mode
+  Options for yarn-cluster mode:
      -yD <arg>                            Dynamic properties
      -yd,--yarndetached                   Start detached
+     -yid,--yarnapplicationId <arg>       Attach to running YARN session
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]
@@ -232,6 +258,9 @@ Action "run" compiles and runs a program.
                                           (t for transfer)
      -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                           MB]
+     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
+                                          sub-paths for high availability mode
+
 
 
 Action "info" shows the optimized execution plan of the program (JSON).
@@ -242,16 +271,13 @@ Action "info" shows the optimized execution plan of the 
program (JSON).
                                       method or "getPlan()" method. Only needed
                                       if the JAR file does not specify the 
class
                                       in its manifest.
-     -m,--jobmanager <host:port>      Address of the JobManager (master) to
-                                      which to connect. Specify 'yarn-cluster'
-                                      as the JobManager to deploy a YARN 
cluster
-                                      for the job. Use this flag to connect to 
a
-                                      different JobManager than the one
-                                      specified in the configuration.
      -p,--parallelism <parallelism>   The parallelism with which to run the
                                       program. Optional flag to override the
                                       default value specified in the
                                       configuration.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
 
 
 Action "list" lists running and scheduled programs.
@@ -259,41 +285,17 @@ Action "list" lists running and scheduled programs.
   Syntax: list [OPTIONS]
   "list" action options:
      -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
      -r,--running                  Show only running programs and their JobIDs
      -s,--scheduled                Show only scheduled programs and their 
JobIDs
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
 
 
-Action "cancel" cancels a running program.
 
-  Syntax: cancel [OPTIONS] <Job ID>
-  "cancel" action options:
-     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
-
-
-Action "stop" stops a running program (streaming jobs only). There are no 
strong consistency
-guarantees for a stop request.
+Action "stop" stops a running program (streaming jobs only).
 
   Syntax: stop [OPTIONS] <Job ID>
   "stop" action options:
@@ -301,24 +303,40 @@ guarantees for a stop request.
                                    to connect. Use this flag to connect to a
                                    different JobManager than the one specified
                                    in the configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
+
+
+Action "cancel" cancels a running program.
+
+  Syntax: cancel [OPTIONS] <Job ID>
+  "cancel" action options:
+     -m,--jobmanager <host:port>            Address of the JobManager (master)
+                                            to which to connect. Use this flag
+                                            to connect to a different 
JobManager
+                                            than the one specified in the
+                                            configuration.
+     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
+                                            The target directory is optional. 
If
+                                            no directory is specified, the
+                                            configured default directory
+                                            (state.savepoints.dir) is used.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
 
 
 Action "savepoint" triggers savepoints for a running job or disposes existing 
ones.
 
- Syntax: savepoint [OPTIONS] <Job ID>
- "savepoint" action options:
-    -d,--dispose <arg>            Path of savepoint to dispose.
-    -j,--jarfile <jarfile>        Flink program JAR file.
-    -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                  to connect. Use this flag to connect to a
-                                  different JobManager than the one specified
-                                  in the configuration.
- Options for yarn-cluster mode:
-    -yid,--yarnapplicationId <arg>   Attach to running YARN session
+  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
+  "savepoint" action options:
+     -d,--dispose <arg>            Path of savepoint to dispose.
+     -j,--jarfile <jarfile>        Flink program JAR file.
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
 ~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 90d3437..0572dc6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -60,7 +60,9 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
@@ -556,20 +558,38 @@ public class CliFrontend {
                }
 
                String[] cleanedArgs = options.getArgs();
+
+               boolean withSavepoint = options.isWithSavepoint();
+               String targetDirectory = options.getSavepointTargetDirectory();
+
                JobID jobId;
 
+               // Figure out jobID. This is a little overly complicated, 
because
+               // we have to figure out whether the optional target directory
+               // is set:
+               // - cancel -s <jobID> => default target dir (JobID parsed as 
opt arg)
+               // - cancel -s <targetDir> <jobID> => custom target dir (parsed 
correctly)
                if (cleanedArgs.length > 0) {
                        String jobIdString = cleanedArgs[0];
                        try {
                                jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-                       }
-                       catch (Exception e) {
+                       } catch (Exception e) {
                                LOG.error("Error: The value for the Job ID is 
not a valid ID.");
                                System.out.println("Error: The value for the 
Job ID is not a valid ID.");
                                return 1;
                        }
-               }
-               else {
+               } else if (targetDirectory != null)  {
+                       // Try this for case: cancel -s <jobID> (default 
savepoint target dir)
+                       String jobIdString = targetDirectory;
+                       try {
+                               jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+                               targetDirectory = null;
+                       } catch (Exception e) {
+                               LOG.error("Missing JobID in the command line 
arguments.");
+                               System.out.println("Error: Specify a Job ID to 
cancel a job.");
+                               return 1;
+                       }
+               } else {
                        LOG.error("Missing JobID in the command line 
arguments.");
                        System.out.println("Error: Specify a Job ID to cancel a 
job.");
                        return 1;
@@ -577,13 +597,36 @@ public class CliFrontend {
 
                try {
                        ActorGateway jobManager = getJobManagerGateway(options);
-                       Future<Object> response = jobManager.ask(new 
CancelJob(jobId), clientTimeout);
 
+                       Object cancelMsg;
+                       if (withSavepoint) {
+                               if (targetDirectory == null) {
+                                       logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
+                               } else {
+                                       logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + ".");
+                               }
+                               cancelMsg = new CancelJobWithSavepoint(jobId, 
targetDirectory);
+                       } else {
+                               logAndSysout("Cancelling job " + jobId + ".");
+                               cancelMsg = new CancelJob(jobId);
+                       }
+
+                       Future<Object> response = jobManager.ask(cancelMsg, 
clientTimeout);
                        final Object rc = Await.result(response, clientTimeout);
 
-                       if (rc instanceof CancellationFailure) {
+                       if (rc instanceof CancellationSuccess) {
+                               if (withSavepoint) {
+                                       CancellationSuccess success = 
(CancellationSuccess) rc;
+                                       String savepointPath = 
success.savepointPath();
+                                       logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + ".");
+                               } else {
+                                       logAndSysout("Cancelled job " + jobId + 
".");
+                               }
+                       } else if (rc instanceof CancellationFailure) {
                                throw new Exception("Canceling the job with ID 
" + jobId + " failed.",
                                                ((CancellationFailure) 
rc).cause());
+                       } else {
+                               throw new IllegalStateException("Unexpected 
response: " + rc);
                        }
 
                        return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
index 22e9ece..54e1a23 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -19,6 +19,8 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
+import static 
org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION;
+
 /**
  * Command line options for the CANCEL command
  */
@@ -26,12 +28,28 @@ public class CancelOptions extends CommandLineOptions {
 
        private final String[] args;
 
+       /** Flag indicating whether to cancel with a savepoint. */
+       private final boolean withSavepoint;
+
+       /** Optional target directory for the savepoint. Overwrites cluster 
default. */
+       private final String targetDirectory;
+
        public CancelOptions(CommandLine line) {
                super(line);
                this.args = line.getArgs();
+               this.withSavepoint = 
line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+               this.targetDirectory = 
line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
        }
 
        public String[] getArgs() {
                return args == null ? new String[0] : args;
        }
+
+       public boolean isWithSavepoint() {
+               return withSavepoint;
+       }
+
+       public String getSavepointTargetDirectory() {
+               return targetDirectory;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 9f3ef63..2527a9c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ public class CliFrontendParser {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(CliFrontendParser.class);
 
-
        static final Option HELP_OPTION = new Option("h", "help", false,
                        "Show the help message for the CLI Frontend or the 
action.");
 
@@ -85,6 +85,11 @@ public class CliFrontendParser {
        static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", 
"zookeeperNamespace", true,
                        "Namespace to create the Zookeeper sub-paths for high 
availability mode");
 
+       static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option(
+                       "s", "withSavepoint", true, "Trigger savepoint and 
cancel job. The target " +
+                       "directory is optional. If no directory is specified, 
the configured default " +
+                       "directory (" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ ") is used.");
+
        static {
                HELP_OPTION.setRequired(false);
 
@@ -118,6 +123,10 @@ public class CliFrontendParser {
 
                ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
                ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
+
+               CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
+               CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
+               CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
        }
 
        private static final Options RUN_OPTIONS = 
getRunOptions(buildGeneralOptions(new Options()));
@@ -188,6 +197,7 @@ public class CliFrontendParser {
        }
 
        private static Options getCancelOptions(Options options) {
+               options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
                options = getJobManagerAddressOption(options);
                return addCustomCliOptions(options, false);
        }
@@ -213,7 +223,6 @@ public class CliFrontendParser {
                return getJobManagerAddressOption(o);
        }
 
-
        private static Options getInfoOptionsWithoutDeprecatedOptions(Options 
options) {
                options.addOption(CLASS_OPTION);
                options.addOption(PARALLELISM_OPTION);
@@ -228,6 +237,7 @@ public class CliFrontendParser {
        }
 
        private static Options getCancelOptionsWithoutDeprecatedOptions(Options 
options) {
+               options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
                options = getJobManagerAddressOption(options);
                return options;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 524e7e7..53311ef 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.client;
 
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
 import akka.testkit.JavaTestKit;
-
-import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -34,7 +36,10 @@ import org.junit.Test;
 import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CliFrontendListCancelTest {
 
@@ -128,6 +133,68 @@ public class CliFrontendListCancelTest {
                }
        }
 
+       /**
+        * Tests cancelling with the savepoint option.
+        */
+       @Test
+       public void testCancelWithSavepoint() throws Exception {
+               {
+                       // Cancel with savepoint (no target directory)
+                       JobID jid = new JobID();
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID);
+                       ActorRef jm = actorSystem.actorOf(props);
+                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { "-s", jid.toString() };
+                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       assertEquals(0, testFrontend.cancel(parameters));
+               }
+
+               {
+                       // Cancel with savepoint (with target directory)
+                       JobID jid = new JobID();
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID, "targetDirectory");
+                       ActorRef jm = actorSystem.actorOf(props);
+                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { "-s", "targetDirectory", 
jid.toString() };
+                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       assertEquals(0, testFrontend.cancel(parameters));
+               }
+
+               {
+                       // Cancel with savepoint (with target directory), but 
no job ID
+                       JobID jid = new JobID();
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID, "targetDirectory");
+                       ActorRef jm = actorSystem.actorOf(props);
+                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { "-s", "targetDirectory" };
+                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       assertNotEquals(0, testFrontend.cancel(parameters));
+               }
+
+               {
+                       // Cancel with savepoint (no target directory)and no 
job ID
+                       JobID jid = new JobID();
+                       UUID leaderSessionID = UUID.randomUUID();
+
+                       Props props = Props.create(CliJobManager.class, jid, 
leaderSessionID);
+                       ActorRef jm = actorSystem.actorOf(props);
+                       ActorGateway gateway = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                       String[] parameters = { "-s" };
+                       InfoListTestCliFrontend testFrontend = new 
InfoListTestCliFrontend(gateway);
+                       assertNotEquals(0, testFrontend.cancel(parameters));
+               }
+       }
+
        @Test
        public void testList() {
                try {
@@ -182,10 +249,16 @@ public class CliFrontendListCancelTest {
        protected static final class CliJobManager extends FlinkUntypedActor {
                private final JobID jobID;
                private final UUID leaderSessionID;
+               private final String targetDirectory;
 
                public CliJobManager(final JobID jobID, final UUID 
leaderSessionID){
+                       this(jobID, leaderSessionID, null);
+               }
+
+               public CliJobManager(final JobID jobID, final UUID 
leaderSessionID, String targetDirectory){
                        this.jobID = jobID;
                        this.leaderSessionID = leaderSessionID;
+                       this.targetDirectory = targetDirectory;
                }
 
                @Override
@@ -198,7 +271,7 @@ public class CliFrontendListCancelTest {
 
                                if (jobID != null && 
jobID.equals(cancelJob.jobID())) {
                                        getSender().tell(
-                                                       decorateMessage(new 
Status.Success(new Object())),
+                                                       decorateMessage(new 
Status.Success(new JobManagerMessages.CancellationSuccess(jobID, null))),
                                                        getSelf());
                                }
                                else {
@@ -207,6 +280,27 @@ public class CliFrontendListCancelTest {
                                                        getSelf());
                                }
                        }
+                       else if (message instanceof 
JobManagerMessages.CancelJobWithSavepoint) {
+                               JobManagerMessages.CancelJobWithSavepoint 
cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
+
+                               if (jobID != null && 
jobID.equals(cancelJob.jobID())) {
+                                       if (targetDirectory == null && 
cancelJob.savepointDirectory() == null ||
+                                                       targetDirectory != null 
&& targetDirectory.equals(cancelJob.savepointDirectory())) {
+                                               getSender().tell(
+                                                               
decorateMessage(new JobManagerMessages.CancellationSuccess(jobID, 
targetDirectory)),
+                                                               getSelf());
+                                       } else {
+                                               getSender().tell(
+                                                               
decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new 
Exception("Wrong target directory"))),
+                                                               getSelf());
+                                       }
+                               }
+                               else {
+                                       getSender().tell(
+                                                       decorateMessage(new 
JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong or no 
JobID"))),
+                                                       getSelf());
+                               }
+                       }
                        else if (message instanceof 
JobManagerMessages.RequestRunningJobsStatus$) {
                                getSender().tell(
                                                decorateMessage(new 
JobManagerMessages.RunningJobsStatus()),

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ab4bde7..00028c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -287,7 +287,7 @@ public class CheckpointCoordinator {
                checkNotNull(targetDirectory, "Savepoint target directory");
 
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
-               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
+               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory, false);
 
                if (result.isSuccess()) {
                        return 
result.getPendingCheckpoint().getCompletionFuture();
@@ -303,13 +303,21 @@ public class CheckpointCoordinator {
         * timestamp.
         *
         * @param timestamp The timestamp for the checkpoint.
+        * @param isPeriodic Flag indicating whether this triggered checkpoint 
is
+        * periodic. If this flag is true, but the periodic scheduler is 
disabled,
+        * the checkpoint will be declined.
         * @return <code>true</code> if triggering the checkpoint succeeded.
         */
-       public boolean triggerCheckpoint(long timestamp) throws Exception {
-               return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory).isSuccess();
+       public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) 
throws Exception {
+               return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory, isPeriodic).isSuccess();
        }
 
-       CheckpointTriggerResult triggerCheckpoint(long timestamp, 
CheckpointProperties props, String targetDirectory) throws Exception {
+       CheckpointTriggerResult triggerCheckpoint(
+                       long timestamp,
+                       CheckpointProperties props,
+                       String targetDirectory,
+                       boolean isPeriodic) throws Exception {
+
                // Sanity check
                if (props.externalizeCheckpoint() && targetDirectory == null) {
                        throw new IllegalStateException("No target directory 
specified to persist checkpoint to.");
@@ -322,6 +330,11 @@ public class CheckpointCoordinator {
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                        }
 
+                       // Don't allow periodic checkpoint if scheduling has 
been disabled
+                       if (isPeriodic && !periodicScheduling) {
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
+                       }
+
                        // validate whether the checkpoint can be triggered, 
with respect to the limit of
                        // concurrent checkpoints, and the minimum time between 
checkpoints.
                        // these checks are not relevant for savepoints
@@ -417,6 +430,7 @@ public class CheckpointCoordinator {
                                        checkpointID,
                                        timestamp,
                                        ackTasks,
+                                       isPeriodic,
                                        props,
                                        targetDirectory);
 
@@ -580,7 +594,7 @@ public class CheckpointCoordinator {
                                }
                                if (!haveMoreRecentPending && 
!triggerRequestQueued) {
                                        LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
-                                       
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), 
checkpoint.getTargetDirectory());
+                                       
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), 
checkpoint.getTargetDirectory(), checkpoint.isPeriodic());
                                } else if (!haveMoreRecentPending) {
                                        LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
                                        triggerQueuedRequests();
@@ -1084,7 +1098,7 @@ public class CheckpointCoordinator {
                @Override
                public void run() {
                        try {
-                               triggerCheckpoint(System.currentTimeMillis());
+                               triggerCheckpoint(System.currentTimeMillis(), 
true);
                        }
                        catch (Exception e) {
                                LOG.error("Exception while triggering 
checkpoint", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 2cc9094..60fe657 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -25,6 +25,8 @@ public enum CheckpointDeclineReason {
 
        COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
 
+       PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut 
down."),
+
        ALREADY_QUEUED("Another checkpoint request has already been queued."),
 
        TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent 
checkpoints is exceeded"),

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 983f1d7..6f50392 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -67,6 +67,9 @@ public class PendingCheckpoint {
 
        private final Map<ExecutionAttemptID, ExecutionVertex> 
notYetAcknowledgedTasks;
 
+       /** Flag indicating whether the checkpoint is triggered as part of 
periodic scheduling. */
+       private final boolean isPeriodic;
+
        /**
         * The checkpoint properties. If the checkpoint should be persisted
         * externally, it happens in {@link #finalizeCheckpoint()}.
@@ -90,12 +93,14 @@ public class PendingCheckpoint {
                        long checkpointId,
                        long checkpointTimestamp,
                        Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
+                       boolean isPeriodic,
                        CheckpointProperties props,
                        String targetDirectory) {
                this.jobId = checkNotNull(jobId);
                this.checkpointId = checkpointId;
                this.checkpointTimestamp = checkpointTimestamp;
                this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+               this.isPeriodic = isPeriodic;
                this.taskStates = new HashMap<>();
                this.props = checkNotNull(props);
                this.targetDirectory = targetDirectory;
@@ -147,6 +152,10 @@ public class PendingCheckpoint {
                return discarded;
        }
 
+       boolean isPeriodic() {
+               return isPeriodic;
+       }
+
        /**
         * Checks whether this checkpoint can be subsumed or whether it should 
always continue, regardless
         * of newer checkpoints in progress.

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 450e810..cca0124 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -578,6 +578,62 @@ class JobManager(
           )
       }
 
+    case CancelJobWithSavepoint(jobId, savepointDirectory) =>
+      try {
+        val targetDirectory = if (savepointDirectory != null) {
+          savepointDirectory
+        } else {
+          defaultSavepointDir
+        }
+
+        log.info(s"Trying to cancel job $jobId with savepoint to 
$targetDirectory")
+
+        currentJobs.get(jobId) match {
+          case Some((executionGraph, _)) =>
+            // We don't want any checkpoint between the savepoint and 
cancellation
+            val coord = executionGraph.getCheckpointCoordinator
+            coord.stopCheckpointScheduler()
+
+            // Trigger the savepoint
+            val future = coord.triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+
+            val senderRef = sender()
+            future.handleAsync[Void](
+              new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
+                  if (success != null) {
+                    val path = success.getExternalPath()
+                    log.info(s"Savepoint stored in $path. Now cancelling 
$jobId.")
+                    executionGraph.cancel()
+                    senderRef ! decorateMessage(CancellationSuccess(jobId, 
path))
+                  } else {
+                    val msg = CancellationFailure(
+                      jobId,
+                      new Exception("Failed to trigger savepoint.", cause))
+                    senderRef ! decorateMessage(msg)
+                  }
+                  null
+                }
+              },
+              context.dispatcher)
+
+          case None =>
+            log.info(s"No job found with ID $jobId.")
+            sender ! decorateMessage(
+              CancellationFailure(
+                jobId,
+                new IllegalArgumentException(s"No job found with ID $jobId."))
+            )
+        }
+      } catch {
+        case t: Throwable =>
+          log.info(s"Failure during cancellation of job $jobId with 
savepoint.", t)
+          sender ! decorateMessage(
+            CancellationFailure(
+              jobId,
+              new Exception(s"Failed to cancel job $jobId with savepoint.", 
t)))
+      }
+
     case StopJob(jobID) =>
       log.info(s"Trying to stop job with ID $jobID.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index fd45cda..4cf6a02 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -111,6 +111,21 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
+    * Cancels the job with the given [[jobID]] at the JobManager. Before 
cancellation a savepoint
+    * is triggered without any other checkpoints in between. The result of the 
cancellation is
+    * the path of the triggered savepoint on success or an exception.
+    *
+    * @param jobID ID of the job to cancel
+    * @param savepointDirectory Optional target directory for the savepoint.
+    *                           If no target directory is specified here, the
+    *                           cluster default is used.
+    */
+  case class CancelJobWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String = null)
+    extends RequiresLeaderSessionID
+
+  /**
    * Stops a (streaming) job with the given [[jobID]] at the JobManager. The 
result of
    * stopping is sent back to the sender as a [[StoppingResponse]] message.
    *
@@ -280,7 +295,9 @@ object JobManagerMessages {
    * Denotes a successful job cancellation
    * @param jobID
    */
-  case class CancellationSuccess(jobID: JobID) extends CancellationResponse
+  case class CancellationSuccess(
+    jobID: JobID,
+    savepointPath: String = null) extends CancellationResponse
 
   /**
    * Denotes a failed job cancellation

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 7b0e819..2a20c6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -135,7 +135,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should not succeed
-                       assertFalse(coord.triggerCheckpoint(timestamp));
+                       assertFalse(coord.triggerCheckpoint(timestamp, false));
 
                        // still, nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -188,7 +188,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should not succeed
-                       assertFalse(coord.triggerCheckpoint(timestamp));
+                       assertFalse(coord.triggerCheckpoint(timestamp, false));
 
                        // still, nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -239,7 +239,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should not succeed
-                       assertFalse(coord.triggerCheckpoint(timestamp));
+                       assertFalse(coord.triggerCheckpoint(timestamp, false));
 
                        // still, nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -290,7 +290,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
 
                        // validate that we have a pending checkpoint
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -417,10 +417,10 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
 
                        // trigger second checkpoint, should also succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp + 2));
+                       assertTrue(coord.triggerCheckpoint(timestamp + 2, 
false));
 
                        // validate that we have a pending checkpoint
                        assertEquals(2, coord.getNumberOfPendingCheckpoints());
@@ -538,7 +538,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
 
                        // validate that we have a pending checkpoint
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -608,7 +608,7 @@ public class CheckpointCoordinatorTest {
                        // trigger another checkpoint and see that this one 
replaces the other checkpoint
                        // ---------------
                        final long timestampNew = timestamp + 7;
-                       coord.triggerCheckpoint(timestampNew);
+                       coord.triggerCheckpoint(timestampNew, false);
 
                        long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        CheckpointMetaData checkpointMetaDataNew = new 
CheckpointMetaData(checkpointIdNew, 0L);
@@ -692,7 +692,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp1));
+                       assertTrue(coord.triggerCheckpoint(timestamp1, false));
 
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -713,7 +713,7 @@ public class CheckpointCoordinatorTest {
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp2));
+                       assertTrue(coord.triggerCheckpoint(timestamp2, false));
 
                        assertEquals(2, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -832,7 +832,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp1));
+                       assertTrue(coord.triggerCheckpoint(timestamp1, false));
 
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -853,7 +853,7 @@ public class CheckpointCoordinatorTest {
 
                        // start the second checkpoint
                        // trigger the first checkpoint. this should succeed
-                       assertTrue(coord.triggerCheckpoint(timestamp2));
+                       assertTrue(coord.triggerCheckpoint(timestamp2, false));
 
                        assertEquals(2, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -955,7 +955,7 @@ public class CheckpointCoordinatorTest {
                                        new DisabledCheckpointStatsTracker());
 
                        // trigger a checkpoint, partially acknowledged
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
                        assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
                        PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().values().iterator().next();
@@ -1023,7 +1023,7 @@ public class CheckpointCoordinatorTest {
                                        null,
                                        new DisabledCheckpointStatsTracker());
 
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
 
                        long checkpointId = 
coord.getPendingCheckpoints().keySet().iterator().next();
 
@@ -1431,10 +1431,10 @@ public class CheckpointCoordinatorTest {
                CheckpointMetaData checkpointMetaDataS1 = new 
CheckpointMetaData(savepointId1, 0L);
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
-               assertTrue(coord.triggerCheckpoint(timestamp + 1));
+               assertTrue(coord.triggerCheckpoint(timestamp + 1, false));
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-               assertTrue(coord.triggerCheckpoint(timestamp + 2));
+               assertTrue(coord.triggerCheckpoint(timestamp + 2, false));
                long checkpointId2 = counter.getLast();
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
@@ -1450,7 +1450,7 @@ public class CheckpointCoordinatorTest {
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
                assertFalse(savepointFuture1.isDone());
 
-               assertTrue(coord.triggerCheckpoint(timestamp + 3));
+               assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
                Future<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
@@ -1841,7 +1841,7 @@ public class CheckpointCoordinatorTest {
                                new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
-               coord.triggerCheckpoint(timestamp);
+               coord.triggerCheckpoint(timestamp, false);
 
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -1946,7 +1946,7 @@ public class CheckpointCoordinatorTest {
                                new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
-               coord.triggerCheckpoint(timestamp);
+               coord.triggerCheckpoint(timestamp, false);
 
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2061,7 +2061,7 @@ public class CheckpointCoordinatorTest {
                                new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
-               coord.triggerCheckpoint(timestamp);
+               coord.triggerCheckpoint(timestamp, false);
 
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2184,7 +2184,7 @@ public class CheckpointCoordinatorTest {
                                new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
-               coord.triggerCheckpoint(timestamp);
+               coord.triggerCheckpoint(timestamp, false);
 
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2298,7 +2298,7 @@ public class CheckpointCoordinatorTest {
                                        "fake-directory",
                                        new DisabledCheckpointStatsTracker());
 
-                       assertTrue(coord.triggerCheckpoint(timestamp));
+                       assertTrue(coord.triggerCheckpoint(timestamp, false));
 
                        for (PendingCheckpoint checkpoint : 
coord.getPendingCheckpoints().values()) {
                                CheckpointProperties props = 
checkpoint.getProps();
@@ -2654,6 +2654,48 @@ public class CheckpointCoordinatorTest {
                }
        }
 
+       @Test
+       public void testStopPeriodicScheduler() throws Exception {
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                               new JobID(),
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker());
+
+               // Periodic
+               CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
+                               System.currentTimeMillis(),
+                               CheckpointProperties.forStandardCheckpoint(),
+                               null,
+                               true);
+
+               assertEquals(true, triggerResult.isFailure());
+               
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, 
triggerResult.getFailureReason());
+
+               // Not periodic
+               triggerResult = coord.triggerCheckpoint(
+                               System.currentTimeMillis(),
+                               CheckpointProperties.forStandardCheckpoint(),
+                               null,
+                               false);
+
+               assertEquals(false, triggerResult.isFailure());
+       }
+
        private void testCreateKeyGroupPartitions(int maxParallelism, int 
parallelism) {
                List<KeyGroupRange> ranges = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism);
                for (int i = 0; i < maxParallelism; ++i) {
@@ -2664,7 +2706,6 @@ public class CheckpointCoordinatorTest {
                }
        }
 
-
        @Test
        public void testPartitionableStateRepartitioning() {
                Random r = new Random(42);
@@ -2809,7 +2850,7 @@ public class CheckpointCoordinatorTest {
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
                String targetDirectory = "xjasdkjakshdmmmxna";
 
-               CheckpointTriggerResult triggerResult = 
coord.triggerCheckpoint(timestamp, props, targetDirectory);
+               CheckpointTriggerResult triggerResult = 
coord.triggerCheckpoint(timestamp, props, targetDirectory, false);
                assertEquals(true, triggerResult.isSuccess());
 
                // validate that we have a pending checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index b4dcab5..950526c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -110,7 +110,7 @@ public class CheckpointStateRestoreTest {
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
-                       coord.triggerCheckpoint(timestamp);
+                       coord.triggerCheckpoint(timestamp, false);
 
                        PendingCheckpoint pending = 
coord.getPendingCheckpoints().values().iterator().next();
                        final long checkpointId = pending.getCheckpointId();
@@ -209,7 +209,7 @@ public class CheckpointStateRestoreTest {
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
-                       coord.triggerCheckpoint(timestamp);
+                       coord.triggerCheckpoint(timestamp, false);
 
                        PendingCheckpoint pending = 
coord.getPendingCheckpoints().values().iterator().next();
                        final long checkpointId = pending.getCheckpointId();

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 2667743..84f0e1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -210,7 +210,7 @@ public class PendingCheckpointTest {
 
        private static PendingCheckpoint 
createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
                Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
props, targetDirectory);
+               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
false, props, targetDirectory);
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 9277029..5ec6991 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -447,9 +446,9 @@ public class JobManagerHARecoveryTest {
 
                @Override
                public void setInitialState(
-                       ChainedStateHandle<StreamStateHandle> chainedState,
-                       List<KeyGroupsStateHandle> keyGroupsState,
-                       List<Collection<OperatorStateHandle>> 
partitionableOperatorState) throws Exception {
+                               ChainedStateHandle<StreamStateHandle> 
chainedState,
+                               List<KeyGroupsStateHandle> keyGroupsState,
+                               List<Collection<OperatorStateHandle>> 
partitionableOperatorState) throws Exception {
                        int subtaskIndex = getIndexInSubtaskGroup();
                        if (subtaskIndex < recoveredStates.length) {
                                try (FSDataInputStream in = 
chainedState.get(0).openInputStream()) {
@@ -465,10 +464,8 @@ public class JobManagerHARecoveryTest {
                                                
String.valueOf(UUID.randomUUID()),
                                                
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
-                               RetrievableStreamStateHandle<Long> state = new 
RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
-
                                ChainedStateHandle<StreamStateHandle> 
chainedStateHandle =
-                                               new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+                                               new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle));
 
                                CheckpointStateHandles checkpointStateHandles =
                                                new 
CheckpointStateHandles(chainedStateHandle, null, 
Collections.<KeyGroupsStateHandle>emptyList());

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 8d150ac..183477a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
@@ -27,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,8 +45,15 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import 
org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
@@ -67,12 +76,17 @@ import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhe
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -81,7 +95,9 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
+import java.io.File;
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -94,6 +110,7 @@ import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.No
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -101,6 +118,9 @@ import static org.junit.Assert.fail;
 
 public class JobManagerTest {
 
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
        private static ActorSystem system;
 
        @BeforeClass
@@ -562,4 +582,126 @@ public class JobManagerTest {
                JobStatusIs jobStatus = Await.result(failedFuture, 
deadline.timeLeft());
                assertEquals(JobStatus.FAILED, jobStatus.state());
        }
+
+       @Test
+       public void testCancelWithSavepoint() throws Exception {
+               File defaultSavepointDir = tmpFolder.newFolder();
+
+               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
defaultSavepointDir.getAbsolutePath());
+
+               ActorSystem actorSystem = null;
+               ActorGateway jobManager = null;
+               ActorGateway archiver = null;
+               ActorGateway taskManager = null;
+               try {
+                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+                                       config,
+                                       actorSystem,
+                                       Option.apply("jm"),
+                                       Option.apply("arch"),
+                                       TestingJobManager.class,
+                                       TestingMemoryArchivist.class);
+
+                       jobManager = new AkkaActorGateway(master._1(), null);
+                       archiver = new AkkaActorGateway(master._2(), null);
+
+                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+                                       config,
+                                       ResourceID.generate(),
+                                       actorSystem,
+                                       "localhost",
+                                       Option.apply("tm"),
+                                       
Option.<LeaderRetrievalService>apply(new 
StandaloneLeaderRetrievalService(jobManager.path())),
+                                       true,
+                                       TestingTaskManager.class);
+
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+
+                       // Wait until connected
+                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+                       Await.ready(taskManager.ask(msg, timeout), timeout);
+
+                       // Create job graph
+                       JobVertex sourceVertex = new JobVertex("Source");
+                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+                       sourceVertex.setParallelism(1);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
+
+                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       
Collections.singletonList(sourceVertex.getID()),
+                                       3600000,
+                                       3600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none());
+
+                       jobGraph.setSnapshotSettings(snapshottingSettings);
+
+                       // Submit job graph
+                       msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Wait for all tasks to be running
+                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Notify when canelled
+                       msg = new NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED);
+                       Future<Object> cancelled = jobManager.ask(msg, timeout);
+
+                       // Cancel with savepoint
+                       String savepointPath = null;
+
+                       for (int i = 0; i < 10; i++) {
+                               msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+                               CancellationResponse cancelResp = 
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
+
+                               if (cancelResp instanceof CancellationFailure) {
+                                       CancellationFailure failure = 
(CancellationFailure) cancelResp;
+                                       if 
(failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message()))
 {
+                                               Thread.sleep(200); // wait and 
retry
+                                       } else {
+                                               
failure.cause().printStackTrace();
+                                               fail("Failed to cancel job: " + 
failure.cause().getMessage());
+                                       }
+                               } else {
+                                       savepointPath = ((CancellationSuccess) 
cancelResp).savepointPath();
+                                       break;
+                               }
+                       }
+
+                       // Verify savepoint path
+                       assertNotEquals("Savepoint not triggered", null, 
savepointPath);
+
+                       // Wait for job status change
+                       Await.ready(cancelled, timeout);
+
+                       File savepointFile = new File(savepointPath);
+                       assertEquals(true, savepointFile.exists());
+               } finally {
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+
+                       if (archiver != null) {
+                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+                       }
+
+                       if (jobManager != null) {
+                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+
+                       if (taskManager != null) {
+                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index fc2835d..74de942 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -64,6 +64,7 @@ import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -100,6 +101,9 @@ public class SavepointITCase extends TestLogger {
        @Rule
        public RetryRule retryRule = new RetryRule();
 
+       @Rule
+       public TemporaryFolder folder= new TemporaryFolder();
+
        /**
         * Tests that it is possible to submit a job, trigger a savepoint, and
         * later restart the job on a new cluster. The savepoint is written to

Reply via email to