Repository: flink
Updated Branches:
  refs/heads/release-1.2 37d6f8196 -> 1864b4e4f


[FLINK-5699] [savepoints] Check target dir when cancelling with savepoint

Problem: when cancelling a job with a savepoint and no savepoint directory
is configured, triggering the savepoint fails with an NPE. This is then
returned to the user as the root cause.

Solution: Instead of simply forwarding the argument (which is possibly
null), we check it for null and return a IllegalStateException with
a meaningful message.

This closes #3263.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1864b4e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1864b4e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1864b4e4

Branch: refs/heads/release-1.2
Commit: 1864b4e4f9cb8bd335ec1821d17d5274bca5b116
Parents: 37d6f81
Author: Ufuk Celebi <u...@apache.org>
Authored: Fri Feb 3 17:28:27 2017 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Feb 7 11:48:51 2017 +0100

----------------------------------------------------------------------
 .../flink/client/CliFrontendListCancelTest.java |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  76 +++++++------
 .../runtime/jobmanager/JobManagerTest.java      | 107 ++++++++++++++++++-
 3 files changed, 148 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 53311ef..4d3405f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -181,7 +181,7 @@ public class CliFrontendListCancelTest {
                }
 
                {
-                       // Cancel with savepoint (no target directory)and no 
job ID
+                       // Cancel with savepoint (no target directory) and no 
job ID
                        JobID jid = new JobID();
                        UUID leaderSessionID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 50a619c..3025727 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -586,44 +586,54 @@ class JobManager(
           defaultSavepointDir
         }
 
-        log.info(s"Trying to cancel job $jobId with savepoint to 
$targetDirectory")
+        if (targetDirectory == null) {
+          log.info(s"Trying to cancel job $jobId with savepoint, but no " +
+            "savepoint directory configured.")
+
+          sender ! decorateMessage(CancellationFailure(jobId, new 
IllegalStateException(
+            "No savepoint directory configured. You can either specify a 
directory " +
+              "while cancelling via -s :targetDirectory or configure a 
cluster-wide " +
+              "default via key '" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + 
"'.")))
+        } else {
+          log.info(s"Trying to cancel job $jobId with savepoint to 
$targetDirectory")
 
-        currentJobs.get(jobId) match {
-          case Some((executionGraph, _)) =>
-            // We don't want any checkpoint between the savepoint and 
cancellation
-            val coord = executionGraph.getCheckpointCoordinator
-            coord.stopCheckpointScheduler()
+          currentJobs.get(jobId) match {
+            case Some((executionGraph, _)) =>
+              // We don't want any checkpoint between the savepoint and 
cancellation
+              val coord = executionGraph.getCheckpointCoordinator
+              coord.stopCheckpointScheduler()
 
-            // Trigger the savepoint
-            val future = coord.triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+              // Trigger the savepoint
+              val future = coord.triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
 
-            val senderRef = sender()
-            future.handleAsync[Void](
-              new BiFunction[CompletedCheckpoint, Throwable, Void] {
-                override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
-                  if (success != null) {
-                    val path = success.getExternalPath()
-                    log.info(s"Savepoint stored in $path. Now cancelling 
$jobId.")
-                    executionGraph.cancel()
-                    senderRef ! decorateMessage(CancellationSuccess(jobId, 
path))
-                  } else {
-                    val msg = CancellationFailure(
-                      jobId,
-                      new Exception("Failed to trigger savepoint.", cause))
-                    senderRef ! decorateMessage(msg)
+              val senderRef = sender()
+              future.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
+                    if (success != null) {
+                      val path = success.getExternalPath()
+                      log.info(s"Savepoint stored in $path. Now cancelling 
$jobId.")
+                      executionGraph.cancel()
+                      senderRef ! decorateMessage(CancellationSuccess(jobId, 
path))
+                    } else {
+                      val msg = CancellationFailure(
+                        jobId,
+                        new Exception("Failed to trigger savepoint.", cause))
+                      senderRef ! decorateMessage(msg)
+                    }
+                    null
                   }
-                  null
-                }
-              },
-              context.dispatcher)
+                },
+                context.dispatcher)
 
-          case None =>
-            log.info(s"No job found with ID $jobId.")
-            sender ! decorateMessage(
-              CancellationFailure(
-                jobId,
-                new IllegalArgumentException(s"No job found with ID $jobId."))
-            )
+            case None =>
+              log.info(s"No job found with ID $jobId.")
+              sender ! decorateMessage(
+                CancellationFailure(
+                  jobId,
+                  new IllegalArgumentException(s"No job found with ID 
$jobId."))
+              )
+          }
         }
       } catch {
         case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/1864b4e4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 2fd2b98..6fffe28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -892,9 +892,110 @@ public class JobManagerTest {
        }
 
        /**
-        * Tests that we can trigger a
-        *
-        * @throws Exception
+        * Tests that a meaningful exception is returned if no savepoint 
directory is
+        * configured.
+        */
+       @Test
+       public void testCancelWithSavepointNoDirectoriesConfigured() throws 
Exception {
+               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
+               Configuration config = new Configuration();
+
+               ActorSystem actorSystem = null;
+               ActorGateway jobManager = null;
+               ActorGateway archiver = null;
+               ActorGateway taskManager = null;
+               try {
+                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+
+                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
+                               config,
+                               actorSystem,
+                               actorSystem.dispatcher(),
+                               actorSystem.dispatcher(),
+                               Option.apply("jm"),
+                               Option.apply("arch"),
+                               TestingJobManager.class,
+                               TestingMemoryArchivist.class);
+
+                       jobManager = new AkkaActorGateway(master._1(), null);
+                       archiver = new AkkaActorGateway(master._2(), null);
+
+                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
+                               config,
+                               ResourceID.generate(),
+                               actorSystem,
+                               "localhost",
+                               Option.apply("tm"),
+                               Option.<LeaderRetrievalService>apply(new 
StandaloneLeaderRetrievalService(jobManager.path())),
+                               true,
+                               TestingTaskManager.class);
+
+                       taskManager = new AkkaActorGateway(taskManagerRef, 
null);
+
+                       // Wait until connected
+                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+                       Await.ready(taskManager.ask(msg, timeout), timeout);
+
+                       // Create job graph
+                       JobVertex sourceVertex = new JobVertex("Source");
+                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+                       sourceVertex.setParallelism(1);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
+
+                       JobSnapshottingSettings snapshottingSettings = new 
JobSnapshottingSettings(
+                               Collections.singletonList(sourceVertex.getID()),
+                               Collections.singletonList(sourceVertex.getID()),
+                               Collections.singletonList(sourceVertex.getID()),
+                               3600000,
+                               3600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               true);
+
+                       jobGraph.setSnapshotSettings(snapshottingSettings);
+
+                       // Submit job graph
+                       msg = new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Wait for all tasks to be running
+                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+                       Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       // Cancel with savepoint
+                       msg = new 
JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+                       CancellationResponse cancelResp = 
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
+
+                       if (cancelResp instanceof CancellationFailure) {
+                               CancellationFailure failure = 
(CancellationFailure) cancelResp;
+                               assertTrue(failure.cause() instanceof 
IllegalStateException);
+                               
assertTrue(failure.cause().getMessage().contains("savepoint directory"));
+                       } else {
+                               fail("Unexpected cancellation response from 
JobManager: " + cancelResp);
+                       }
+               } finally {
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+
+                       if (archiver != null) {
+                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+                       }
+
+                       if (jobManager != null) {
+                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+
+                       if (taskManager != null) {
+                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+                       }
+               }
+       }
+
+       /**
+        * Tests that we can trigger a savepoint when periodic checkpoints are 
disabled.
         */
        @Test
        public void testSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {

Reply via email to