Repository: flink
Updated Branches:
  refs/heads/master e29ac036a -> 28c57c3a5


[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly

This PR terminates the ExecutionGraphs properly without restarts when the 
JobManager calls
cancelAndClearEverything. It is achieved by allowing the method to be only 
called with an
SuppressRestartsException. The SuppressRestartsException will disable the 
restart strategy of
the respective ExecutionGraph. This is important because the root cause could 
be a different
exception. In order to avoid race conditions, the restart strategy has to be 
checked twice
whether it allwos to restart the job: Once before and once after the job has 
transitioned to
the state RESTARTING. This avoids that ExecutionGraphs can become an orphan.

Furhtermore, this PR fixes the problem that the default restart strategy is 
shared by multiple
jobs. The problem is solved by introducing a RestartStrategyFactory which 
creates for every
job its own instance of a RestartStrategy.

Fix LeaderChangeJobRecoveryTest case

This closes #1923.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c57c3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c57c3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c57c3a

Branch: refs/heads/master
Commit: 28c57c3a57cbd7a02e756ee98c0b1168cec69feb
Parents: e29ac03
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Apr 21 17:07:51 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Apr 26 19:04:36 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  28 ++-
 .../restart/FixedDelayRestartStrategy.java      |  30 ++-
 .../restart/NoRestartStrategy.java              |  17 +-
 .../executiongraph/restart/RestartStrategy.java |   5 +
 .../restart/RestartStrategyFactory.java         |  28 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  16 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../JobManagerLeaderElectionTest.java           |   2 +-
 .../LeaderChangeJobRecoveryTest.java            | 201 +++++++++++++++++++
 .../LeaderChangeStateCleanupTest.java           |   2 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  23 ++-
 .../runtime/testingUtils/TestingCluster.scala   |  10 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   8 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   8 +-
 15 files changed, 336 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9ee8ee5..3796402 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -798,6 +798,14 @@ public class ExecutionGraph implements Serializable {
        }
 
        public void fail(Throwable t) {
+               if (t instanceof SuppressRestartsException) {
+                       if (restartStrategy != null) {
+                               // disable the restart strategy in case that we 
have seen a SuppressRestartsException
+                               // it basically overrides the restart behaviour 
of a the root cause
+                               restartStrategy.disable();
+                       }
+               }
+
                while (true) {
                        JobStatus current = state;
                        if (current == JobStatus.FAILING || 
current.isTerminalState()) {
@@ -1021,15 +1029,17 @@ public class ExecutionGraph implements Serializable {
                                                }
                                        }
                                        else if (current == JobStatus.FAILING) {
-                                               boolean allowRestart = 
!(failureCause instanceof SuppressRestartsException);
-
-                                               if (allowRestart && 
restartStrategy.canRestart() &&
-                                                               
transitionState(current, JobStatus.RESTARTING)) {
-                                                       
restartStrategy.restart(this);
-                                                       break;
-
-                                               } else if ((!allowRestart || 
!restartStrategy.canRestart()) &&
-                                                       
transitionState(current, JobStatus.FAILED, failureCause)) {
+                                               if 
(restartStrategy.canRestart() && transitionState(current, 
JobStatus.RESTARTING)) {
+                                                       // double check in case 
that in the meantime a SuppressRestartsException was thrown
+                                                       if 
(restartStrategy.canRestart()) {
+                                                               
restartStrategy.restart(this);
+                                                               break;
+                                                       } else {
+                                                               fail(new 
Exception("ExecutionGraph went into RESTARTING state but " +
+                                                                       "then 
the restart strategy was disabled."));
+                                                       }
+
+                                               } else if 
(!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, 
failureCause)) {
                                                        postRunCleanup();
                                                        break;
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index d3c7eba..464b48e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -41,6 +41,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
        private final int maxNumberRestartAttempts;
        private final long delayBetweenRestartAttempts;
        private int currentRestartAttempt;
+       private boolean disabled = false;
 
        public FixedDelayRestartStrategy(
                int maxNumberRestartAttempts,
@@ -60,7 +61,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
 
        @Override
        public boolean canRestart() {
-               return currentRestartAttempt < maxNumberRestartAttempts;
+               return !disabled && currentRestartAttempt < 
maxNumberRestartAttempts;
        }
 
        @Override
@@ -83,6 +84,11 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                }, executionGraph.getExecutionContext());
        }
 
+       @Override
+       public void disable() {
+               disabled = true;
+       }
+
        /**
         * Creates a FixedDelayRestartStrategy from the given Configuration.
         *
@@ -90,7 +96,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
         * @return Initialized instance of FixedDelayRestartStrategy
         * @throws Exception
         */
-       public static FixedDelayRestartStrategy create(Configuration 
configuration) throws Exception {
+       public static FixedDelayRestartStrategyFactory 
createFactory(Configuration configuration) throws Exception {
                int maxAttempts = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
1);
 
                String timeoutString = configuration.getString(
@@ -118,7 +124,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                        }
                }
 
-               return new FixedDelayRestartStrategy(maxAttempts, delay);
+               return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
        }
 
        @Override
@@ -128,4 +134,22 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                                ", delayBetweenRestartAttempts=" + 
delayBetweenRestartAttempts +
                                ')';
        }
+
+       public static class FixedDelayRestartStrategyFactory extends 
RestartStrategyFactory {
+
+               private static final long serialVersionUID = 
6642934067762271950L;
+
+               private final int maxAttempts;
+               private final long delay;
+
+               public FixedDelayRestartStrategyFactory(int maxAttempts, long 
delay) {
+                       this.maxAttempts = maxAttempts;
+                       this.delay = delay;
+               }
+
+               @Override
+               public RestartStrategy createRestartStrategy() {
+                       return new FixedDelayRestartStrategy(maxAttempts, 
delay);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 8911a98..6cc5ee4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -36,18 +36,31 @@ public class NoRestartStrategy implements RestartStrategy {
                throw new RuntimeException("NoRestartStrategy does not support 
restart.");
        }
 
+       @Override
+       public void disable() {}
+
        /**
         * Creates a NoRestartStrategy instance.
         *
         * @param configuration Configuration object which is ignored
         * @return NoRestartStrategy instance
         */
-       public static NoRestartStrategy create(Configuration configuration) {
-               return new NoRestartStrategy();
+       public static NoRestartStrategyFactory createFactory(Configuration 
configuration) {
+               return new NoRestartStrategyFactory();
        }
 
        @Override
        public String toString() {
                return "NoRestartStrategy";
        }
+
+       public static class NoRestartStrategyFactory extends 
RestartStrategyFactory {
+
+               private static final long serialVersionUID = 
-1809462525812787862L;
+
+               @Override
+               public RestartStrategy createRestartStrategy() {
+                       return new NoRestartStrategy();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 2880c01..c9e6277 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -38,4 +38,9 @@ public interface RestartStrategy {
         * @param executionGraph The ExecutionGraph to be restarted
         */
        void restart(ExecutionGraph executionGraph);
+
+       /**
+        * Disables the restart strategy.
+        */
+       void disable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 68d114e..e58d775 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -25,12 +25,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
+import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-public class RestartStrategyFactory {
+public abstract class RestartStrategyFactory implements Serializable {
+       private static final long serialVersionUID = 7320252552640522191L;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(RestartStrategyFactory.class);
-       private static final String CREATE_METHOD = "create";
+       private static final String CREATE_METHOD = "createFactory";
+
+       /**
+        * Factory method to create a restart strategy
+        * @return The created restart strategy
+        */
+       public abstract RestartStrategy createRestartStrategy();
 
        /**
         * Creates a {@link RestartStrategy} instance from the given {@link 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
@@ -58,11 +67,10 @@ public class RestartStrategyFactory {
        /**
         * Creates a {@link RestartStrategy} instance from the given {@link 
Configuration}.
         *
-        * @param configuration Configuration object containing the 
configuration values.
         * @return RestartStrategy instance
         * @throws Exception which indicates that the RestartStrategy could not 
be instantiated.
         */
-       public static RestartStrategy createFromConfig(Configuration 
configuration) throws Exception {
+       public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration configuration) throws Exception {
                String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
 
                switch (restartStrategyName) {
@@ -92,16 +100,16 @@ public class RestartStrategyFactory {
                                }
 
                                if (numberExecutionRetries > 0 && delay >= 0) {
-                                       return new 
FixedDelayRestartStrategy(numberExecutionRetries, delay);
+                                       return new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries,
 delay);
                                } else {
-                                       return 
NoRestartStrategy.create(configuration);
+                                       return 
NoRestartStrategy.createFactory(configuration);
                                }
                        case "off":
                        case "disable":
-                               return NoRestartStrategy.create(configuration);
+                               return 
NoRestartStrategy.createFactory(configuration);
                        case "fixeddelay":
                        case "fixed-delay":
-                               return 
FixedDelayRestartStrategy.create(configuration);
+                               return 
FixedDelayRestartStrategy.createFactory(configuration);
                        default:
                                try {
                                        Class<?> clazz = 
Class.forName(restartStrategyName);
@@ -113,7 +121,7 @@ public class RestartStrategyFactory {
                                                        Object result = 
method.invoke(null, configuration);
 
                                                        if (result != null) {
-                                                               return 
(RestartStrategy) result;
+                                                               return 
(RestartStrategyFactory) result;
                                                        }
                                                }
                                        }
@@ -128,7 +136,7 @@ public class RestartStrategyFactory {
                                }
 
                                // fallback in case of an error
-                               return NoRestartStrategy.create(configuration);
+                               return 
NoRestartStrategy.createFactory(configuration);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9f0482e..d8b8a01 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,6 +28,7 @@ import akka.actor._
 import akka.pattern.ask
 
 import grizzled.slf4j.Logger
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
@@ -118,7 +119,7 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val defaultRestartStrategy: RestartStrategy,
+    protected val restartStrategyFactory: RestartStrategyFactory,
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
@@ -210,7 +211,7 @@ class JobManager(
     log.info(s"Stopping JobManager $getAddress.")
 
     val newFuturesToComplete = cancelAndClearEverything(
-      new Exception("The JobManager is shutting down."),
+      new SuppressRestartsException(new Exception("The JobManager is shutting 
down.")),
       removeJobFromStateBackend = true)
 
     implicit val executionContext = context.dispatcher
@@ -307,7 +308,7 @@ class JobManager(
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked 
leadership.")
 
       val newFuturesToComplete = cancelAndClearEverything(
-        new Exception("JobManager is no longer the leader."),
+        new SuppressRestartsException(new Exception("JobManager is no longer 
the leader.")),
         removeJobFromStateBackend = false)
 
       futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ 
newFuturesToComplete)
@@ -1071,7 +1072,7 @@ class JobManager(
         val restartStrategy = 
Option(jobGraph.getExecutionConfig().getRestartStrategy())
           .map(RestartStrategyFactory.createRestartStrategy(_)) match {
             case Some(strategy) => strategy
-            case None => defaultRestartStrategy
+            case None => restartStrategyFactory.createRestartStrategy()
           }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
@@ -1629,7 +1630,7 @@ class JobManager(
     * @param cause Cause for the cancelling.
     */
   private def cancelAndClearEverything(
-      cause: Throwable,
+      cause: SuppressRestartsException,
       removeJobFromStateBackend: Boolean)
     : Seq[Future[Unit]] = {
     val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
@@ -2265,7 +2266,7 @@ object JobManager {
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
-    RestartStrategy,
+    RestartStrategyFactory,
     FiniteDuration, // timeout
     Int, // number of archived jobs
     LeaderElectionService,
@@ -2281,8 +2282,7 @@ object JobManager {
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val restartStrategy = RestartStrategyFactory
-      .createFromConfig(configuration)
+    val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8e1b751..1c7815c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -41,7 +41,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.blob.{BlobKey, BlobClient, BlobCache, 
BlobService}
+import org.apache.flink.runtime.blob.{BlobClient, BlobCache, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, 
TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index fe35c0d..afc46a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                                new 
Scheduler(TestingUtils.defaultExecutionContext()),
                                new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
                                ActorRef.noSender(),
-                               new NoRestartStrategy(),
+                               new 
NoRestartStrategy.NoRestartStrategyFactory(),
                                AkkaUtils.getDefaultTimeout(),
                                leaderElectionService,
                                submittedJobGraphStore,

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
new file mode 100644
index 0000000..a2cefb6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertTrue;
+
+public class LeaderChangeJobRecoveryTest extends TestLogger {
+
+       private static FiniteDuration timeout = FiniteDuration.apply(30, 
TimeUnit.SECONDS);
+
+       private int numTMs = 1;
+       private int numSlotsPerTM = 1;
+       private int parallelism = numTMs * numSlotsPerTM;
+
+       private Configuration configuration;
+       private LeaderElectionRetrievalTestingCluster cluster = null;
+       private JobGraph job = createBlockingJob(parallelism);
+
+       @Before
+       public void before() throws TimeoutException, InterruptedException {
+               Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+               configuration = new Configuration();
+
+               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
+               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
+
+               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false, new 
FixedDelayRestartStrategy(9999, 100));
+               cluster.start(false);
+
+               // wait for actors to be alive so that they have started their 
leader retrieval service
+               cluster.waitForActorsToBeAlive();
+       }
+
+       /**
+        * Tests that the job is not restarted or at least terminates 
eventually in case that the
+        * JobManager loses its leadership.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testNotRestartedWhenLosingLeadership() throws Exception {
+               UUID leaderSessionID = UUID.randomUUID();
+
+               cluster.grantLeadership(0, leaderSessionID);
+               cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+               cluster.waitForTaskManagersToBeRegistered(timeout);
+
+               cluster.submitJobDetached(job);
+
+               ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+               Future<Object> wait = jm.ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()),
 timeout);
+
+               Await.ready(wait, timeout);
+
+               Future<Object> futureExecutionGraph = jm.ask(new 
TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout);
+
+               TestingJobManagerMessages.ResponseExecutionGraph 
responseExecutionGraph =
+                       (TestingJobManagerMessages.ResponseExecutionGraph) 
Await.result(futureExecutionGraph, timeout);
+
+               assertTrue(responseExecutionGraph instanceof 
TestingJobManagerMessages.ExecutionGraphFound);
+
+               ExecutionGraph executionGraph = 
((TestingJobManagerMessages.ExecutionGraphFound) 
responseExecutionGraph).executionGraph();
+
+               TestActorGateway testActorGateway = new TestActorGateway();
+
+               executionGraph.registerJobStatusListener(testActorGateway);
+
+               cluster.revokeLeadership();
+
+               Future<Boolean> hasReachedTerminalState = 
testActorGateway.hasReachedTerminalState();
+
+               assertTrue("The job should have reached a terminal state.", 
Await.result(hasReachedTerminalState, timeout));
+       }
+
+       public JobGraph createBlockingJob(int parallelism) {
+               Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+               JobVertex sender = new JobVertex("sender");
+               JobVertex receiver = new JobVertex("receiver");
+
+               sender.setInvokableClass(Tasks.Sender.class);
+               receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
+
+               sender.setParallelism(parallelism);
+               receiver.setParallelism(parallelism);
+
+               receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
+
+               SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+               sender.setSlotSharingGroup(slotSharingGroup);
+               receiver.setSlotSharingGroup(slotSharingGroup);
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+
+               return new JobGraph("Blocking test job", executionConfig, 
sender, receiver);
+       }
+
+       public static class TestActorGateway implements ActorGateway {
+
+               private static final long serialVersionUID = 
-736146686160538227L;
+               private transient Promise<Boolean> terminalState = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+
+               public Future<Boolean> hasReachedTerminalState() {
+                       return terminalState.future();
+               }
+
+               @Override
+               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
+                       return null;
+               }
+
+               @Override
+               public void tell(Object message) {
+                       this.tell(message, new 
AkkaActorGateway(ActorRef.noSender(), null));
+               }
+
+               @Override
+               public void tell(Object message, ActorGateway sender) {
+                       if (message instanceof 
ExecutionGraphMessages.JobStatusChanged) {
+                               ExecutionGraphMessages.JobStatusChanged 
jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
+
+                               if 
(jobStatusChanged.newJobStatus().isTerminalState()) {
+                                       terminalState.success(true);
+                               }
+                       }
+               }
+
+               @Override
+               public void forward(Object message, ActorGateway sender) {
+
+               }
+
+               @Override
+               public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
+                       return null;
+               }
+
+               @Override
+               public String path() {
+                       return null;
+               }
+
+               @Override
+               public ActorRef actor() {
+                       return null;
+               }
+
+               @Override
+               public UUID leaderSessionID() {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index f14d62f..6d938ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -68,7 +68,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
 
-               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false);
+               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
                cluster.start(false); // TaskManagers don't have to register at 
the JobManager
 
                cluster.waitForActorsToBeAlive(); // we only wait until all 
actors are alive

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index c8cf868..cd89fa6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -38,6 +39,7 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
        private final Configuration userConfiguration;
        private final boolean useSingleActorSystem;
+       private final RestartStrategy restartStrategy;
 
        public List<TestingLeaderElectionService> leaderElectionServices;
        public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -47,7 +49,8 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
        public LeaderElectionRetrievalTestingCluster(
                        Configuration userConfiguration,
                        boolean singleActorSystem,
-                       boolean synchronousDispatcher) {
+                       boolean synchronousDispatcher,
+                       RestartStrategy restartStrategy) {
                super(userConfiguration, singleActorSystem, 
synchronousDispatcher);
 
                this.userConfiguration = userConfiguration;
@@ -55,6 +58,8 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
                leaderElectionServices = new 
ArrayList<TestingLeaderElectionService>();
                leaderRetrievalServices = new 
ArrayList<TestingLeaderRetrievalService>();
+
+               this.restartStrategy = restartStrategy;
        }
 
        @Override
@@ -90,6 +95,15 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
                                
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
        }
 
+       @Override
+       public RestartStrategy getRestartStrategy(RestartStrategy other) {
+               if (this.restartStrategy != null) {
+                       return this.restartStrategy;
+               } else {
+                       return other;
+               }
+       }
+
        public void grantLeadership(int index, UUID leaderSessionID) {
                if(leaderIndex >= 0) {
                        // first revoke leadership
@@ -109,4 +123,11 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
                        service.notifyListener(address, leaderSessionID);
                }
        }
+
+       public void revokeLeadership() {
+               if (leaderIndex >= 0) {
+                       leaderElectionServices.get(leaderIndex).notLeader();
+                       leaderIndex = -1;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 5b08a45..0c4ffb9 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,11 +26,11 @@ import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.testutils.TestingResourceManager
-import org.apache.flink.util.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 
@@ -100,7 +100,7 @@ class TestingCluster(
     instanceManager,
     scheduler,
     libraryCacheManager,
-    restartStrategy,
+    restartStrategyFactory,
     timeout,
     archiveCount,
     leaderElectionService,
@@ -122,7 +122,7 @@ class TestingCluster(
         scheduler,
         libraryCacheManager,
         archive,
-        restartStrategy,
+        restartStrategyFactory,
         timeout,
         leaderElectionService,
         submittedJobsGraphs,
@@ -186,6 +186,10 @@ class TestingCluster(
     None
   }
 
+  def getRestartStrategy(restartStrategy: RestartStrategy) = {
+    restartStrategy
+  }
+
   @throws(classOf[TimeoutException])
   @throws(classOf[InterruptedException])
   def waitForTaskManagersToBeAlive(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 53867e0..e854b13 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
@@ -44,7 +44,7 @@ class TestingJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategy: RestartStrategy,
+    restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -58,7 +58,7 @@ class TestingJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategy,
+    restartStrategyFactory,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index a6289d4..2f93785 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -24,7 +24,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategy Default restart strategy for job restarts
+  * @param restartStrategyFactory Default restart strategy for job restarts
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
@@ -57,7 +57,7 @@ class TestingYarnJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategy: RestartStrategy,
+    restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -71,7 +71,7 @@ class TestingYarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategy,
+    restartStrategyFactory,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 80520a1..f291c02 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.ApplicationStatus
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
@@ -53,7 +53,7 @@ import scala.language.postfixOps
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategy Restart strategy to be used in case of a job 
recovery
+  * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
@@ -64,7 +64,7 @@ class YarnJobManager(
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategy: RestartStrategy,
+    restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -78,7 +78,7 @@ class YarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategy,
+    restartStrategyFactory,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

Reply via email to