Repository: flink Updated Branches: refs/heads/release-1.2 37d6f8196 -> 1864b4e4f
[FLINK-5699] [savepoints] Check target dir when cancelling with savepoint Problem: when cancelling a job with a savepoint and no savepoint directory is configured, triggering the savepoint fails with an NPE. This is then returned to the user as the root cause. Solution: Instead of simply forwarding the argument (which is possibly null), we check it for null and return a IllegalStateException with a meaningful message. This closes #3263. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1864b4e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1864b4e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1864b4e4 Branch: refs/heads/release-1.2 Commit: 1864b4e4f9cb8bd335ec1821d17d5274bca5b116 Parents: 37d6f81 Author: Ufuk Celebi <u...@apache.org> Authored: Fri Feb 3 17:28:27 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue Feb 7 11:48:51 2017 +0100 ---------------------------------------------------------------------- .../flink/client/CliFrontendListCancelTest.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 76 +++++++------ .../runtime/jobmanager/JobManagerTest.java | 107 ++++++++++++++++++- 3 files changed, 148 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 53311ef..4d3405f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -181,7 +181,7 @@ public class CliFrontendListCancelTest { } { - // Cancel with savepoint (no target directory)and no job ID + // Cancel with savepoint (no target directory) and no job ID JobID jid = new JobID(); UUID leaderSessionID = UUID.randomUUID(); http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 50a619c..3025727 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -586,44 +586,54 @@ class JobManager( defaultSavepointDir } - log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory") + if (targetDirectory == null) { + log.info(s"Trying to cancel job $jobId with savepoint, but no " + + "savepoint directory configured.") + + sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException( + "No savepoint directory configured. You can either specify a directory " + + "while cancelling via -s :targetDirectory or configure a cluster-wide " + + "default via key '" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."))) + } else { + log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory") - currentJobs.get(jobId) match { - case Some((executionGraph, _)) => - // We don't want any checkpoint between the savepoint and cancellation - val coord = executionGraph.getCheckpointCoordinator - coord.stopCheckpointScheduler() + currentJobs.get(jobId) match { + case Some((executionGraph, _)) => + // We don't want any checkpoint between the savepoint and cancellation + val coord = executionGraph.getCheckpointCoordinator + coord.stopCheckpointScheduler() - // Trigger the savepoint - val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory) + // Trigger the savepoint + val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory) - val senderRef = sender() - future.handleAsync[Void]( - new BiFunction[CompletedCheckpoint, Throwable, Void] { - override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { - if (success != null) { - val path = success.getExternalPath() - log.info(s"Savepoint stored in $path. Now cancelling $jobId.") - executionGraph.cancel() - senderRef ! decorateMessage(CancellationSuccess(jobId, path)) - } else { - val msg = CancellationFailure( - jobId, - new Exception("Failed to trigger savepoint.", cause)) - senderRef ! decorateMessage(msg) + val senderRef = sender() + future.handleAsync[Void]( + new BiFunction[CompletedCheckpoint, Throwable, Void] { + override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { + if (success != null) { + val path = success.getExternalPath() + log.info(s"Savepoint stored in $path. Now cancelling $jobId.") + executionGraph.cancel() + senderRef ! decorateMessage(CancellationSuccess(jobId, path)) + } else { + val msg = CancellationFailure( + jobId, + new Exception("Failed to trigger savepoint.", cause)) + senderRef ! decorateMessage(msg) + } + null } - null - } - }, - context.dispatcher) + }, + context.dispatcher) - case None => - log.info(s"No job found with ID $jobId.") - sender ! decorateMessage( - CancellationFailure( - jobId, - new IllegalArgumentException(s"No job found with ID $jobId.")) - ) + case None => + log.info(s"No job found with ID $jobId.") + sender ! decorateMessage( + CancellationFailure( + jobId, + new IllegalArgumentException(s"No job found with ID $jobId.")) + ) + } } } catch { case t: Throwable => http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 2fd2b98..6fffe28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -892,9 +892,110 @@ public class JobManagerTest { } /** - * Tests that we can trigger a - * - * @throws Exception + * Tests that a meaningful exception is returned if no savepoint directory is + * configured. + */ + @Test + public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + Configuration config = new Configuration(); + + ActorSystem actorSystem = null; + ActorGateway jobManager = null; + ActorGateway archiver = null; + ActorGateway taskManager = null; + try { + actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors( + config, + actorSystem, + actorSystem.dispatcher(), + actorSystem.dispatcher(), + Option.apply("jm"), + Option.apply("arch"), + TestingJobManager.class, + TestingMemoryArchivist.class); + + jobManager = new AkkaActorGateway(master._1(), null); + archiver = new AkkaActorGateway(master._2(), null); + + ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + config, + ResourceID.generate(), + actorSystem, + "localhost", + Option.apply("tm"), + Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())), + true, + TestingTaskManager.class); + + taskManager = new AkkaActorGateway(taskManagerRef, null); + + // Wait until connected + Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); + Await.ready(taskManager.ask(msg, timeout), timeout); + + // Create job graph + JobVertex sourceVertex = new JobVertex("Source"); + sourceVertex.setInvokableClass(BlockingStatefulInvokable.class); + sourceVertex.setParallelism(1); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex); + + JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings( + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + Collections.singletonList(sourceVertex.getID()), + 3600000, + 3600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + true); + + jobGraph.setSnapshotSettings(snapshottingSettings); + + // Submit job graph + msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Wait for all tasks to be running + msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()); + Await.result(jobManager.ask(msg, timeout), timeout); + + // Cancel with savepoint + msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null); + CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout); + + if (cancelResp instanceof CancellationFailure) { + CancellationFailure failure = (CancellationFailure) cancelResp; + assertTrue(failure.cause() instanceof IllegalStateException); + assertTrue(failure.cause().getMessage().contains("savepoint directory")); + } else { + fail("Unexpected cancellation response from JobManager: " + cancelResp); + } + } finally { + if (actorSystem != null) { + actorSystem.shutdown(); + } + + if (archiver != null) { + archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (jobManager != null) { + jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + if (taskManager != null) { + taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + } + + /** + * Tests that we can trigger a savepoint when periodic checkpoints are disabled. */ @Test public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {