Repository: flink Updated Branches: refs/heads/master fd410d9f6 -> 5783671c2
[FLINK-4717] Add CancelJobWithSavepoint - Adds CancelJobWithSavepoint message, which triggers a savepoint before cancelling the respective job. - Adds -s [targetDirectory] option to CLI cancel command: * bin/flink cancel <jobID> (regular cancelling) * bin/flink cancel -s <jobID> (cancel with savepoint to default dir) * bin/flink cancel -s <targetDir> <jobID> (cancel with savepoint to targetDir) This closes #2609. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5783671c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5783671c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5783671c Branch: refs/heads/master Commit: 5783671c2f30228a2d5b5b7bf09b762ae41db8e2 Parents: fd410d9 Author: Ufuk Celebi <u...@apache.org> Authored: Fri Oct 7 11:48:47 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Fri Oct 14 10:07:09 2016 +0200 ---------------------------------------------------------------------- docs/setup/cli.md | 192 ++++++++++--------- .../org/apache/flink/client/CliFrontend.java | 55 +++++- .../apache/flink/client/cli/CancelOptions.java | 18 ++ .../flink/client/cli/CliFrontendParser.java | 14 +- .../flink/client/CliFrontendListCancelTest.java | 106 +++++++++- .../checkpoint/CheckpointCoordinator.java | 26 ++- .../checkpoint/CheckpointDeclineReason.java | 2 + .../runtime/checkpoint/PendingCheckpoint.java | 9 + .../flink/runtime/jobmanager/JobManager.scala | 56 ++++++ .../runtime/messages/JobManagerMessages.scala | 19 +- .../checkpoint/CheckpointCoordinatorTest.java | 89 ++++++--- .../checkpoint/CheckpointStateRestoreTest.java | 4 +- .../checkpoint/PendingCheckpointTest.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 11 +- .../runtime/jobmanager/JobManagerTest.java | 142 ++++++++++++++ .../test/checkpointing/SavepointITCase.java | 4 + 16 files changed, 607 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/docs/setup/cli.md ---------------------------------------------------------------------- diff --git a/docs/setup/cli.md b/docs/setup/cli.md index 251a0f6..0d3045e 100644 --- a/docs/setup/cli.md +++ b/docs/setup/cli.md @@ -113,6 +113,10 @@ The command line can be used to ./bin/flink cancel <jobID> +- Cancel a job with a savepoint: + + ./bin/flink cancel -s [targetDirectory] <jobID> + - Stop a job (streaming jobs only): ./bin/flink stop <jobID> @@ -144,7 +148,19 @@ Returns the path of the created savepoint. You need this path to restore and dis You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]). -#### **Restore a savepoint** +##### Cancel with a savepoint + +You can atomically trigger a savepoint and cancel a job. + +{% highlight bash %} +./bin/flink cancel -s [savepointDirectory] <jobID> +{% endhighlight %} + +If no savepoint directory is configured, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]). + +The job will only be cancelled if the savepoint succeeds. + +#### Restore a savepoint {% highlight bash %} ./bin/flink run -s <savepointPath> ... @@ -152,7 +168,7 @@ You can optionally specify a `savepointDirectory` when triggering the savepoint. The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command. -#### **Dispose a savepoint** +#### Dispose a savepoint {% highlight bash %} ./bin/flink savepoint -d <savepointPath> @@ -181,41 +197,51 @@ Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: - -c,--class <classname> Class with the program entry point - ("main" method or "getPlan()" method. - Only needed if the JAR file does not - specify the class in its manifest. - -C,--classpath <url> Adds a URL to each user code - classloader on all nodes in the - cluster. The paths must specify a - protocol (e.g. file://) and be - accessible on all nodes (e.g. by means - of a NFS share). You can use this - option multiple times for specifying - more than one URL. The protocol must - be supported by the {@link - java.net.URLClassLoader}. - -d,--detached If present, runs the job in detached - mode - -m,--jobmanager <host:port> Address of the JobManager (master) to - which to connect. Specify - 'yarn-cluster' as the JobManager to - deploy a YARN cluster for the job. Use - this flag to connect to a different - JobManager than the one specified in - the configuration. - -p,--parallelism <parallelism> The parallelism with which to run the - program. Optional flag to override the - default value specified in the - configuration. - -q,--sysoutLogging If present, supress logging output to - standard out. - -s,--fromSavepoint <savepointPath> Path to a savepoint to reset the job - back to (for example - file:///flink/savepoint-1537). - Additional arguments if -m yarn-cluster is set: + -c,--class <classname> Class with the program entry + point ("main" method or + "getPlan()" method. Only + needed if the JAR file does + not specify the class in its + manifest. + -C,--classpath <url> Adds a URL to each user code + classloader on all nodes in + the cluster. The paths must + specify a protocol (e.g. + file://) and be accessible + on all nodes (e.g. by means + of a NFS share). You can use + this option multiple times + for specifying more than one + URL. The protocol must be + supported by the {@link + java.net.URLClassLoader}. + -d,--detached If present, runs the job in + detached mode + -m,--jobmanager <host:port> Address of the JobManager + (master) to which to + connect. Use this flag to + connect to a different + JobManager than the one + specified in the + configuration. + -p,--parallelism <parallelism> The parallelism with which + to run the program. Optional + flag to override the default + value specified in the + configuration. + -q,--sysoutLogging If present, suppress logging + output to standard out. + -s,--fromSavepoint <savepointPath> Path to a savepoint to reset + the job back to (for example + file:///flink/savepoint-1537 + ). + -z,--zookeeperNamespace <zookeeperNamespace> Namespace to create the + Zookeeper sub-paths for high + availability mode + Options for yarn-cluster mode: -yD <arg> Dynamic properties -yd,--yarndetached Start detached + -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in MB] @@ -232,6 +258,9 @@ Action "run" compiles and runs a program. (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [in MB] + -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper + sub-paths for high availability mode + Action "info" shows the optimized execution plan of the program (JSON). @@ -242,16 +271,13 @@ Action "info" shows the optimized execution plan of the program (JSON). method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. - -m,--jobmanager <host:port> Address of the JobManager (master) to - which to connect. Specify 'yarn-cluster' - as the JobManager to deploy a YARN cluster - for the job. Use this flag to connect to a - different JobManager than the one - specified in the configuration. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. + Options for yarn-cluster mode: + -yid,--yarnapplicationId <arg> Attach to running YARN session + Action "list" lists running and scheduled programs. @@ -259,41 +285,17 @@ Action "list" lists running and scheduled programs. Syntax: list [OPTIONS] "list" action options: -m,--jobmanager <host:port> Address of the JobManager (master) to which - to connect. Specify 'yarn-cluster' as the - JobManager to deploy a YARN cluster for the - job. Use this flag to connect to a different - JobManager than the one specified in the - configuration. + to connect. Use this flag to connect to a + different JobManager than the one specified + in the configuration. -r,--running Show only running programs and their JobIDs -s,--scheduled Show only scheduled programs and their JobIDs - Additional arguments if -m yarn-cluster is set: - -yid <yarnApplicationId> YARN application ID of Flink YARN session to - connect to. Must not be set if JobManager HA - is used. In this case, JobManager RPC - location is automatically retrieved from - Zookeeper. + Options for yarn-cluster mode: + -yid,--yarnapplicationId <arg> Attach to running YARN session -Action "cancel" cancels a running program. - Syntax: cancel [OPTIONS] <Job ID> - "cancel" action options: - -m,--jobmanager <host:port> Address of the JobManager (master) to which - to connect. Specify 'yarn-cluster' as the - JobManager to deploy a YARN cluster for the - job. Use this flag to connect to a different - JobManager than the one specified in the - configuration. - Additional arguments if -m yarn-cluster is set: - -yid <yarnApplicationId> YARN application ID of Flink YARN session to - connect to. Must not be set if JobManager HA - is used. In this case, JobManager RPC - location is automatically retrieved from - Zookeeper. - - -Action "stop" stops a running program (streaming jobs only). There are no strong consistency -guarantees for a stop request. +Action "stop" stops a running program (streaming jobs only). Syntax: stop [OPTIONS] <Job ID> "stop" action options: @@ -301,24 +303,40 @@ guarantees for a stop request. to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. - Additional arguments if -m yarn-cluster is set: - -yid <yarnApplicationId> YARN application ID of Flink YARN session to - connect to. Must not be set if JobManager HA - is used. In this case, JobManager RPC - location is automatically retrieved from - Zookeeper. + Options for yarn-cluster mode: + -yid,--yarnapplicationId <arg> Attach to running YARN session + + + +Action "cancel" cancels a running program. + + Syntax: cancel [OPTIONS] <Job ID> + "cancel" action options: + -m,--jobmanager <host:port> Address of the JobManager (master) + to which to connect. Use this flag + to connect to a different JobManager + than the one specified in the + configuration. + -s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job. + The target directory is optional. If + no directory is specified, the + configured default directory + (state.savepoints.dir) is used. + Options for yarn-cluster mode: + -yid,--yarnapplicationId <arg> Attach to running YARN session + Action "savepoint" triggers savepoints for a running job or disposes existing ones. - Syntax: savepoint [OPTIONS] <Job ID> - "savepoint" action options: - -d,--dispose <arg> Path of savepoint to dispose. - -j,--jarfile <jarfile> Flink program JAR file. - -m,--jobmanager <host:port> Address of the JobManager (master) to which - to connect. Use this flag to connect to a - different JobManager than the one specified - in the configuration. - Options for yarn-cluster mode: - -yid,--yarnapplicationId <arg> Attach to running YARN session + Syntax: savepoint [OPTIONS] <Job ID> [<target directory>] + "savepoint" action options: + -d,--dispose <arg> Path of savepoint to dispose. + -j,--jarfile <jarfile> Flink program JAR file. + -m,--jobmanager <host:port> Address of the JobManager (master) to which + to connect. Use this flag to connect to a + different JobManager than the one specified + in the configuration. + Options for yarn-cluster mode: + -yid,--yarnapplicationId <arg> Attach to running YARN session ~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 90d3437..0572dc6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -60,7 +60,9 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; @@ -556,20 +558,38 @@ public class CliFrontend { } String[] cleanedArgs = options.getArgs(); + + boolean withSavepoint = options.isWithSavepoint(); + String targetDirectory = options.getSavepointTargetDirectory(); + JobID jobId; + // Figure out jobID. This is a little overly complicated, because + // we have to figure out whether the optional target directory + // is set: + // - cancel -s <jobID> => default target dir (JobID parsed as opt arg) + // - cancel -s <targetDir> <jobID> => custom target dir (parsed correctly) if (cleanedArgs.length > 0) { String jobIdString = cleanedArgs[0]; try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error: The value for the Job ID is not a valid ID."); System.out.println("Error: The value for the Job ID is not a valid ID."); return 1; } - } - else { + } else if (targetDirectory != null) { + // Try this for case: cancel -s <jobID> (default savepoint target dir) + String jobIdString = targetDirectory; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + targetDirectory = null; + } catch (Exception e) { + LOG.error("Missing JobID in the command line arguments."); + System.out.println("Error: Specify a Job ID to cancel a job."); + return 1; + } + } else { LOG.error("Missing JobID in the command line arguments."); System.out.println("Error: Specify a Job ID to cancel a job."); return 1; @@ -577,13 +597,36 @@ public class CliFrontend { try { ActorGateway jobManager = getJobManagerGateway(options); - Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout); + Object cancelMsg; + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); + } else { + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + "."); + } + cancelMsg = new CancelJobWithSavepoint(jobId, targetDirectory); + } else { + logAndSysout("Cancelling job " + jobId + "."); + cancelMsg = new CancelJob(jobId); + } + + Future<Object> response = jobManager.ask(cancelMsg, clientTimeout); final Object rc = Await.result(response, clientTimeout); - if (rc instanceof CancellationFailure) { + if (rc instanceof CancellationSuccess) { + if (withSavepoint) { + CancellationSuccess success = (CancellationSuccess) rc; + String savepointPath = success.savepointPath(); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + "."); + } else { + logAndSysout("Cancelled job " + jobId + "."); + } + } else if (rc instanceof CancellationFailure) { throw new Exception("Canceling the job with ID " + jobId + " failed.", ((CancellationFailure) rc).cause()); + } else { + throw new IllegalStateException("Unexpected response: " + rc); } return 0; http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java index 22e9ece..54e1a23 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java @@ -19,6 +19,8 @@ package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; +import static org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION; + /** * Command line options for the CANCEL command */ @@ -26,12 +28,28 @@ public class CancelOptions extends CommandLineOptions { private final String[] args; + /** Flag indicating whether to cancel with a savepoint. */ + private final boolean withSavepoint; + + /** Optional target directory for the savepoint. Overwrites cluster default. */ + private final String targetDirectory; + public CancelOptions(CommandLine line) { super(line); this.args = line.getArgs(); + this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt()); + this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt()); } public String[] getArgs() { return args == null ? new String[0] : args; } + + public boolean isWithSavepoint() { + return withSavepoint; + } + + public String getSavepointTargetDirectory() { + return targetDirectory; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 9f3ef63..2527a9c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ public class CliFrontendParser { private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class); - static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action."); @@ -85,6 +85,11 @@ public class CliFrontendParser { static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); + static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option( + "s", "withSavepoint", true, "Trigger savepoint and cancel job. The target " + + "directory is optional. If no directory is specified, the configured default " + + "directory (" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + ") is used."); + static { HELP_OPTION.setRequired(false); @@ -118,6 +123,10 @@ public class CliFrontendParser { ZOOKEEPER_NAMESPACE_OPTION.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace"); + + CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false); + CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); + CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); } private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options())); @@ -188,6 +197,7 @@ public class CliFrontendParser { } private static Options getCancelOptions(Options options) { + options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); options = getJobManagerAddressOption(options); return addCustomCliOptions(options, false); } @@ -213,7 +223,6 @@ public class CliFrontendParser { return getJobManagerAddressOption(o); } - private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(PARALLELISM_OPTION); @@ -228,6 +237,7 @@ public class CliFrontendParser { } private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) { + options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); options = getJobManagerAddressOption(options); return options; } http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 524e7e7..53311ef 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -18,14 +18,16 @@ package org.apache.flink.client; -import akka.actor.*; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Status; import akka.testkit.JavaTestKit; - -import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -34,7 +36,10 @@ import org.junit.Test; import java.util.UUID; import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class CliFrontendListCancelTest { @@ -128,6 +133,68 @@ public class CliFrontendListCancelTest { } } + /** + * Tests cancelling with the savepoint option. + */ + @Test + public void testCancelWithSavepoint() throws Exception { + { + // Cancel with savepoint (no target directory) + JobID jid = new JobID(); + UUID leaderSessionID = UUID.randomUUID(); + + Props props = Props.create(CliJobManager.class, jid, leaderSessionID); + ActorRef jm = actorSystem.actorOf(props); + ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { "-s", jid.toString() }; + InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + assertEquals(0, testFrontend.cancel(parameters)); + } + + { + // Cancel with savepoint (with target directory) + JobID jid = new JobID(); + UUID leaderSessionID = UUID.randomUUID(); + + Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory"); + ActorRef jm = actorSystem.actorOf(props); + ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { "-s", "targetDirectory", jid.toString() }; + InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + assertEquals(0, testFrontend.cancel(parameters)); + } + + { + // Cancel with savepoint (with target directory), but no job ID + JobID jid = new JobID(); + UUID leaderSessionID = UUID.randomUUID(); + + Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory"); + ActorRef jm = actorSystem.actorOf(props); + ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { "-s", "targetDirectory" }; + InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + assertNotEquals(0, testFrontend.cancel(parameters)); + } + + { + // Cancel with savepoint (no target directory)and no job ID + JobID jid = new JobID(); + UUID leaderSessionID = UUID.randomUUID(); + + Props props = Props.create(CliJobManager.class, jid, leaderSessionID); + ActorRef jm = actorSystem.actorOf(props); + ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { "-s" }; + InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + assertNotEquals(0, testFrontend.cancel(parameters)); + } + } + @Test public void testList() { try { @@ -182,10 +249,16 @@ public class CliFrontendListCancelTest { protected static final class CliJobManager extends FlinkUntypedActor { private final JobID jobID; private final UUID leaderSessionID; + private final String targetDirectory; public CliJobManager(final JobID jobID, final UUID leaderSessionID){ + this(jobID, leaderSessionID, null); + } + + public CliJobManager(final JobID jobID, final UUID leaderSessionID, String targetDirectory){ this.jobID = jobID; this.leaderSessionID = leaderSessionID; + this.targetDirectory = targetDirectory; } @Override @@ -198,7 +271,7 @@ public class CliFrontendListCancelTest { if (jobID != null && jobID.equals(cancelJob.jobID())) { getSender().tell( - decorateMessage(new Status.Success(new Object())), + decorateMessage(new Status.Success(new JobManagerMessages.CancellationSuccess(jobID, null))), getSelf()); } else { @@ -207,6 +280,27 @@ public class CliFrontendListCancelTest { getSelf()); } } + else if (message instanceof JobManagerMessages.CancelJobWithSavepoint) { + JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message; + + if (jobID != null && jobID.equals(cancelJob.jobID())) { + if (targetDirectory == null && cancelJob.savepointDirectory() == null || + targetDirectory != null && targetDirectory.equals(cancelJob.savepointDirectory())) { + getSender().tell( + decorateMessage(new JobManagerMessages.CancellationSuccess(jobID, targetDirectory)), + getSelf()); + } else { + getSender().tell( + decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong target directory"))), + getSelf()); + } + } + else { + getSender().tell( + decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong or no JobID"))), + getSelf()); + } + } else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) { getSender().tell( decorateMessage(new JobManagerMessages.RunningJobsStatus()), http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index ab4bde7..00028c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -287,7 +287,7 @@ public class CheckpointCoordinator { checkNotNull(targetDirectory, "Savepoint target directory"); CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false); if (result.isSuccess()) { return result.getPendingCheckpoint().getCompletionFuture(); @@ -303,13 +303,21 @@ public class CheckpointCoordinator { * timestamp. * * @param timestamp The timestamp for the checkpoint. + * @param isPeriodic Flag indicating whether this triggered checkpoint is + * periodic. If this flag is true, but the periodic scheduler is disabled, + * the checkpoint will be declined. * @return <code>true</code> if triggering the checkpoint succeeded. */ - public boolean triggerCheckpoint(long timestamp) throws Exception { - return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory).isSuccess(); + public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) throws Exception { + return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess(); } - CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, String targetDirectory) throws Exception { + CheckpointTriggerResult triggerCheckpoint( + long timestamp, + CheckpointProperties props, + String targetDirectory, + boolean isPeriodic) throws Exception { + // Sanity check if (props.externalizeCheckpoint() && targetDirectory == null) { throw new IllegalStateException("No target directory specified to persist checkpoint to."); @@ -322,6 +330,11 @@ public class CheckpointCoordinator { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } + // Don't allow periodic checkpoint if scheduling has been disabled + if (isPeriodic && !periodicScheduling) { + return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); + } + // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints @@ -417,6 +430,7 @@ public class CheckpointCoordinator { checkpointID, timestamp, ackTasks, + isPeriodic, props, targetDirectory); @@ -580,7 +594,7 @@ public class CheckpointCoordinator { } if (!haveMoreRecentPending && !triggerRequestQueued) { LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory()); + triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); } else if (!haveMoreRecentPending) { LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); triggerQueuedRequests(); @@ -1084,7 +1098,7 @@ public class CheckpointCoordinator { @Override public void run() { try { - triggerCheckpoint(System.currentTimeMillis()); + triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint", e); http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java index 2cc9094..60fe657 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java @@ -25,6 +25,8 @@ public enum CheckpointDeclineReason { COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."), + PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut down."), + ALREADY_QUEUED("Another checkpoint request has already been queued."), TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent checkpoints is exceeded"), http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 983f1d7..6f50392 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -67,6 +67,9 @@ public class PendingCheckpoint { private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks; + /** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */ + private final boolean isPeriodic; + /** * The checkpoint properties. If the checkpoint should be persisted * externally, it happens in {@link #finalizeCheckpoint()}. @@ -90,12 +93,14 @@ public class PendingCheckpoint { long checkpointId, long checkpointTimestamp, Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, + boolean isPeriodic, CheckpointProperties props, String targetDirectory) { this.jobId = checkNotNull(jobId); this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm); + this.isPeriodic = isPeriodic; this.taskStates = new HashMap<>(); this.props = checkNotNull(props); this.targetDirectory = targetDirectory; @@ -147,6 +152,10 @@ public class PendingCheckpoint { return discarded; } + boolean isPeriodic() { + return isPeriodic; + } + /** * Checks whether this checkpoint can be subsumed or whether it should always continue, regardless * of newer checkpoints in progress. http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 450e810..cca0124 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -578,6 +578,62 @@ class JobManager( ) } + case CancelJobWithSavepoint(jobId, savepointDirectory) => + try { + val targetDirectory = if (savepointDirectory != null) { + savepointDirectory + } else { + defaultSavepointDir + } + + log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory") + + currentJobs.get(jobId) match { + case Some((executionGraph, _)) => + // We don't want any checkpoint between the savepoint and cancellation + val coord = executionGraph.getCheckpointCoordinator + coord.stopCheckpointScheduler() + + // Trigger the savepoint + val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory) + + val senderRef = sender() + future.handleAsync[Void]( + new BiFunction[CompletedCheckpoint, Throwable, Void] { + override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { + if (success != null) { + val path = success.getExternalPath() + log.info(s"Savepoint stored in $path. Now cancelling $jobId.") + executionGraph.cancel() + senderRef ! decorateMessage(CancellationSuccess(jobId, path)) + } else { + val msg = CancellationFailure( + jobId, + new Exception("Failed to trigger savepoint.", cause)) + senderRef ! decorateMessage(msg) + } + null + } + }, + context.dispatcher) + + case None => + log.info(s"No job found with ID $jobId.") + sender ! decorateMessage( + CancellationFailure( + jobId, + new IllegalArgumentException(s"No job found with ID $jobId.")) + ) + } + } catch { + case t: Throwable => + log.info(s"Failure during cancellation of job $jobId with savepoint.", t) + sender ! decorateMessage( + CancellationFailure( + jobId, + new Exception(s"Failed to cancel job $jobId with savepoint.", t))) + } + case StopJob(jobID) => log.info(s"Trying to stop job with ID $jobID.") http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index fd45cda..4cf6a02 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -111,6 +111,21 @@ object JobManagerMessages { case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID /** + * Cancels the job with the given [[jobID]] at the JobManager. Before cancellation a savepoint + * is triggered without any other checkpoints in between. The result of the cancellation is + * the path of the triggered savepoint on success or an exception. + * + * @param jobID ID of the job to cancel + * @param savepointDirectory Optional target directory for the savepoint. + * If no target directory is specified here, the + * cluster default is used. + */ + case class CancelJobWithSavepoint( + jobID: JobID, + savepointDirectory: String = null) + extends RequiresLeaderSessionID + + /** * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of * stopping is sent back to the sender as a [[StoppingResponse]] message. * @@ -280,7 +295,9 @@ object JobManagerMessages { * Denotes a successful job cancellation * @param jobID */ - case class CancellationSuccess(jobID: JobID) extends CancellationResponse + case class CancellationSuccess( + jobID: JobID, + savepointPath: String = null) extends CancellationResponse /** * Denotes a failed job cancellation http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 7b0e819..2a20c6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -135,7 +135,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should not succeed - assertFalse(coord.triggerCheckpoint(timestamp)); + assertFalse(coord.triggerCheckpoint(timestamp, false)); // still, nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -188,7 +188,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should not succeed - assertFalse(coord.triggerCheckpoint(timestamp)); + assertFalse(coord.triggerCheckpoint(timestamp, false)); // still, nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -239,7 +239,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should not succeed - assertFalse(coord.triggerCheckpoint(timestamp)); + assertFalse(coord.triggerCheckpoint(timestamp, false)); // still, nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -290,7 +290,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); // validate that we have a pending checkpoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -417,10 +417,10 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); // trigger second checkpoint, should also succeed - assertTrue(coord.triggerCheckpoint(timestamp + 2)); + assertTrue(coord.triggerCheckpoint(timestamp + 2, false)); // validate that we have a pending checkpoint assertEquals(2, coord.getNumberOfPendingCheckpoints()); @@ -538,7 +538,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); // validate that we have a pending checkpoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -608,7 +608,7 @@ public class CheckpointCoordinatorTest { // trigger another checkpoint and see that this one replaces the other checkpoint // --------------- final long timestampNew = timestamp + 7; - coord.triggerCheckpoint(timestampNew); + coord.triggerCheckpoint(timestampNew, false); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L); @@ -692,7 +692,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp1)); + assertTrue(coord.triggerCheckpoint(timestamp1, false)); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -713,7 +713,7 @@ public class CheckpointCoordinatorTest { // start the second checkpoint // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp2)); + assertTrue(coord.triggerCheckpoint(timestamp2, false)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -832,7 +832,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp1)); + assertTrue(coord.triggerCheckpoint(timestamp1, false)); assertEquals(1, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -853,7 +853,7 @@ public class CheckpointCoordinatorTest { // start the second checkpoint // trigger the first checkpoint. this should succeed - assertTrue(coord.triggerCheckpoint(timestamp2)); + assertTrue(coord.triggerCheckpoint(timestamp2, false)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -955,7 +955,7 @@ public class CheckpointCoordinatorTest { new DisabledCheckpointStatsTracker()); // trigger a checkpoint, partially acknowledged - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); assertEquals(1, coord.getNumberOfPendingCheckpoints()); PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next(); @@ -1023,7 +1023,7 @@ public class CheckpointCoordinatorTest { null, new DisabledCheckpointStatsTracker()); - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next(); @@ -1431,10 +1431,10 @@ public class CheckpointCoordinatorTest { CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L); assertEquals(1, coord.getNumberOfPendingCheckpoints()); - assertTrue(coord.triggerCheckpoint(timestamp + 1)); + assertTrue(coord.triggerCheckpoint(timestamp + 1, false)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); - assertTrue(coord.triggerCheckpoint(timestamp + 2)); + assertTrue(coord.triggerCheckpoint(timestamp + 2, false)); long checkpointId2 = counter.getLast(); assertEquals(3, coord.getNumberOfPendingCheckpoints()); @@ -1450,7 +1450,7 @@ public class CheckpointCoordinatorTest { assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); assertFalse(savepointFuture1.isDone()); - assertTrue(coord.triggerCheckpoint(timestamp + 3)); + assertTrue(coord.triggerCheckpoint(timestamp + 3, false)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); @@ -1841,7 +1841,7 @@ public class CheckpointCoordinatorTest { new DisabledCheckpointStatsTracker()); // trigger the checkpoint - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); @@ -1946,7 +1946,7 @@ public class CheckpointCoordinatorTest { new DisabledCheckpointStatsTracker()); // trigger the checkpoint - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); @@ -2061,7 +2061,7 @@ public class CheckpointCoordinatorTest { new DisabledCheckpointStatsTracker()); // trigger the checkpoint - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); @@ -2184,7 +2184,7 @@ public class CheckpointCoordinatorTest { new DisabledCheckpointStatsTracker()); // trigger the checkpoint - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); assertTrue(coord.getPendingCheckpoints().keySet().size() == 1); long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); @@ -2298,7 +2298,7 @@ public class CheckpointCoordinatorTest { "fake-directory", new DisabledCheckpointStatsTracker()); - assertTrue(coord.triggerCheckpoint(timestamp)); + assertTrue(coord.triggerCheckpoint(timestamp, false)); for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) { CheckpointProperties props = checkpoint.getProps(); @@ -2654,6 +2654,48 @@ public class CheckpointCoordinatorTest { } } + @Test + public void testStopPeriodicScheduler() throws Exception { + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker()); + + // Periodic + CheckpointTriggerResult triggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forStandardCheckpoint(), + null, + true); + + assertEquals(true, triggerResult.isFailure()); + assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason()); + + // Not periodic + triggerResult = coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forStandardCheckpoint(), + null, + false); + + assertEquals(false, triggerResult.isFailure()); + } + private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) { List<KeyGroupRange> ranges = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism); for (int i = 0; i < maxParallelism; ++i) { @@ -2664,7 +2706,6 @@ public class CheckpointCoordinatorTest { } } - @Test public void testPartitionableStateRepartitioning() { Random r = new Random(42); @@ -2809,7 +2850,7 @@ public class CheckpointCoordinatorTest { CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); String targetDirectory = "xjasdkjakshdmmmxna"; - CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory); + CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory, false); assertEquals(true, triggerResult.isSuccess()); // validate that we have a pending checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index b4dcab5..950526c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -110,7 +110,7 @@ public class CheckpointStateRestoreTest { // create ourselves a checkpoint with state final long timestamp = 34623786L; - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next(); final long checkpointId = pending.getCheckpointId(); @@ -209,7 +209,7 @@ public class CheckpointStateRestoreTest { // create ourselves a checkpoint with state final long timestamp = 34623786L; - coord.triggerCheckpoint(timestamp); + coord.triggerCheckpoint(timestamp, false); PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next(); final long checkpointId = pending.getCheckpointId(); http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 2667743..84f0e1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -210,7 +210,7 @@ public class PendingCheckpointTest { private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) { Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS); - return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, props, targetDirectory); + return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, false, props, targetDirectory); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 9277029..5ec6991 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.RetrievableStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -447,9 +446,9 @@ public class JobManagerHARecoveryTest { @Override public void setInitialState( - ChainedStateHandle<StreamStateHandle> chainedState, - List<KeyGroupsStateHandle> keyGroupsState, - List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception { + ChainedStateHandle<StreamStateHandle> chainedState, + List<KeyGroupsStateHandle> keyGroupsState, + List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception { int subtaskIndex = getIndexInSubtaskGroup(); if (subtaskIndex < recoveredStates.length) { try (FSDataInputStream in = chainedState.get(0).openInputStream()) { @@ -465,10 +464,8 @@ public class JobManagerHARecoveryTest { String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); - RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle); - ChainedStateHandle<StreamStateHandle> chainedStateHandle = - new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state)); + new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle)); CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(chainedStateHandle, null, Collections.<KeyGroupsStateHandle>emptyList()); http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 8d150ac..183477a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.testkit.JavaTestKit; import com.typesafe.config.Config; import org.apache.flink.api.common.JobID; @@ -27,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -43,8 +45,15 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; @@ -67,12 +76,17 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhe import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; +import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; import org.apache.flink.runtime.testingUtils.TestingTaskManager; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Option; import scala.Some; import scala.Tuple2; import scala.concurrent.Await; @@ -81,7 +95,9 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; +import java.io.File; import java.net.InetAddress; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -94,6 +110,7 @@ import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.No import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -101,6 +118,9 @@ import static org.junit.Assert.fail; public class JobManagerTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + private static ActorSystem system; @BeforeClass @@ -562,4 +582,126 @@ public class JobManagerTest { JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft()); assertEquals(JobStatus.FAILED, jobStatus.state()); } + + @Test + public void testCancelWithSavepoint() throws Exception { + File defaultSavepointDir = tmpFolder.newFolder(); + + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + Configuration config = new Configuration(); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath()); + + ActorSystem actorSystem = null; + ActorGateway jobManager = null; + ActorGateway archiver = null; + ActorGateway taskManager = null; + try { + actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + config, + actorSystem, + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + jobManager = new AkkaActorGateway(master._1(), null); + archiver = new AkkaActorGateway(master._2(), null); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + config, + ResourceID.generate(), + actorSystem, + "localhost", + Option.apply("tm"), + Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())), + true, + TestingTaskManager.class); + + taskManager = new AkkaActorGateway(taskManagerRef, null); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, timeout), timeout); + + // Create job graph + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(BlockingStatefulInvokable.class); + sourceVertex.setParallelism(1); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + 3600000, + 3600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none()); + + jobGraph.setSnapshotSettings(snapshottingSettings); + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Wait for all tasks to be running + msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Notify when canelled + msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED); + Future<Object> cancelled = jobManager.ask(msg, timeout); + + // Cancel with savepoint + String savepointPath = null; + + for (int i = 0; i < 10; i++) { + msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null); + CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout); + + if (cancelResp instanceof CancellationFailure) { + CancellationFailure failure = (CancellationFailure) cancelResp; + if (failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) { + Thread.sleep(200); // wait and retry + } else { + failure.cause().printStackTrace(); + fail("Failed to cancel job: " + failure.cause().getMessage()); + } + } else { + savepointPath = ((CancellationSuccess) cancelResp).savepointPath(); + break; + } + } + + // Verify savepoint path + assertNotEquals("Savepoint not triggered", null, savepointPath); + + // Wait for job status change + Await.ready(cancelled, timeout); + + File savepointFile = new File(savepointPath); + assertEquals(true, savepointFile.exists()); + } finally { + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (archiver != null) { + archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index fc2835d..74de942 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -64,6 +64,7 @@ import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.TestLogger; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -100,6 +101,9 @@ public class SavepointITCase extends TestLogger { @Rule public RetryRule retryRule = new RetryRule(); + @Rule + public TemporaryFolder folder= new TemporaryFolder(); + /** * Tests that it is possible to submit a job, trigger a savepoint, and * later restart the job on a new cluster. The savepoint is written to