Repository: flink Updated Branches: refs/heads/master e29ac036a -> 28c57c3a5
[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly This PR terminates the ExecutionGraphs properly without restarts when the JobManager calls cancelAndClearEverything. It is achieved by allowing the method to be only called with an SuppressRestartsException. The SuppressRestartsException will disable the restart strategy of the respective ExecutionGraph. This is important because the root cause could be a different exception. In order to avoid race conditions, the restart strategy has to be checked twice whether it allwos to restart the job: Once before and once after the job has transitioned to the state RESTARTING. This avoids that ExecutionGraphs can become an orphan. Furhtermore, this PR fixes the problem that the default restart strategy is shared by multiple jobs. The problem is solved by introducing a RestartStrategyFactory which creates for every job its own instance of a RestartStrategy. Fix LeaderChangeJobRecoveryTest case This closes #1923. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c57c3a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c57c3a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c57c3a Branch: refs/heads/master Commit: 28c57c3a57cbd7a02e756ee98c0b1168cec69feb Parents: e29ac03 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Apr 21 17:07:51 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Apr 26 19:04:36 2016 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 28 ++- .../restart/FixedDelayRestartStrategy.java | 30 ++- .../restart/NoRestartStrategy.java | 17 +- .../executiongraph/restart/RestartStrategy.java | 5 + .../restart/RestartStrategyFactory.java | 28 ++- .../flink/runtime/jobmanager/JobManager.scala | 16 +- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../JobManagerLeaderElectionTest.java | 2 +- .../LeaderChangeJobRecoveryTest.java | 201 +++++++++++++++++++ .../LeaderChangeStateCleanupTest.java | 2 +- .../LeaderElectionRetrievalTestingCluster.java | 23 ++- .../runtime/testingUtils/TestingCluster.scala | 10 +- .../testingUtils/TestingJobManager.scala | 6 +- .../flink/yarn/TestingYarnJobManager.scala | 8 +- .../org/apache/flink/yarn/YarnJobManager.scala | 8 +- 15 files changed, 336 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9ee8ee5..3796402 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -798,6 +798,14 @@ public class ExecutionGraph implements Serializable { } public void fail(Throwable t) { + if (t instanceof SuppressRestartsException) { + if (restartStrategy != null) { + // disable the restart strategy in case that we have seen a SuppressRestartsException + // it basically overrides the restart behaviour of a the root cause + restartStrategy.disable(); + } + } + while (true) { JobStatus current = state; if (current == JobStatus.FAILING || current.isTerminalState()) { @@ -1021,15 +1029,17 @@ public class ExecutionGraph implements Serializable { } } else if (current == JobStatus.FAILING) { - boolean allowRestart = !(failureCause instanceof SuppressRestartsException); - - if (allowRestart && restartStrategy.canRestart() && - transitionState(current, JobStatus.RESTARTING)) { - restartStrategy.restart(this); - break; - - } else if ((!allowRestart || !restartStrategy.canRestart()) && - transitionState(current, JobStatus.FAILED, failureCause)) { + if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { + // double check in case that in the meantime a SuppressRestartsException was thrown + if (restartStrategy.canRestart()) { + restartStrategy.restart(this); + break; + } else { + fail(new Exception("ExecutionGraph went into RESTARTING state but " + + "then the restart strategy was disabled.")); + } + + } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index d3c7eba..464b48e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -41,6 +41,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; + private boolean disabled = false; public FixedDelayRestartStrategy( int maxNumberRestartAttempts, @@ -60,7 +61,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { @Override public boolean canRestart() { - return currentRestartAttempt < maxNumberRestartAttempts; + return !disabled && currentRestartAttempt < maxNumberRestartAttempts; } @Override @@ -83,6 +84,11 @@ public class FixedDelayRestartStrategy implements RestartStrategy { }, executionGraph.getExecutionContext()); } + @Override + public void disable() { + disabled = true; + } + /** * Creates a FixedDelayRestartStrategy from the given Configuration. * @@ -90,7 +96,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { * @return Initialized instance of FixedDelayRestartStrategy * @throws Exception */ - public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception { + public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception { int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); String timeoutString = configuration.getString( @@ -118,7 +124,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { } } - return new FixedDelayRestartStrategy(maxAttempts, delay); + return new FixedDelayRestartStrategyFactory(maxAttempts, delay); } @Override @@ -128,4 +134,22 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts + ')'; } + + public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory { + + private static final long serialVersionUID = 6642934067762271950L; + + private final int maxAttempts; + private final long delay; + + public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) { + this.maxAttempts = maxAttempts; + this.delay = delay; + } + + @Override + public RestartStrategy createRestartStrategy() { + return new FixedDelayRestartStrategy(maxAttempts, delay); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 8911a98..6cc5ee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -36,18 +36,31 @@ public class NoRestartStrategy implements RestartStrategy { throw new RuntimeException("NoRestartStrategy does not support restart."); } + @Override + public void disable() {} + /** * Creates a NoRestartStrategy instance. * * @param configuration Configuration object which is ignored * @return NoRestartStrategy instance */ - public static NoRestartStrategy create(Configuration configuration) { - return new NoRestartStrategy(); + public static NoRestartStrategyFactory createFactory(Configuration configuration) { + return new NoRestartStrategyFactory(); } @Override public String toString() { return "NoRestartStrategy"; } + + public static class NoRestartStrategyFactory extends RestartStrategyFactory { + + private static final long serialVersionUID = -1809462525812787862L; + + @Override + public RestartStrategy createRestartStrategy() { + return new NoRestartStrategy(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index 2880c01..c9e6277 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -38,4 +38,9 @@ public interface RestartStrategy { * @param executionGraph The ExecutionGraph to be restarted */ void restart(ExecutionGraph executionGraph); + + /** + * Disables the restart strategy. + */ + void disable(); } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index 68d114e..e58d775 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -25,12 +25,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -public class RestartStrategyFactory { +public abstract class RestartStrategyFactory implements Serializable { + private static final long serialVersionUID = 7320252552640522191L; + private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class); - private static final String CREATE_METHOD = "create"; + private static final String CREATE_METHOD = "createFactory"; + + /** + * Factory method to create a restart strategy + * @return The created restart strategy + */ + public abstract RestartStrategy createRestartStrategy(); /** * Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}. @@ -58,11 +67,10 @@ public class RestartStrategyFactory { /** * Creates a {@link RestartStrategy} instance from the given {@link Configuration}. * - * @param configuration Configuration object containing the configuration values. * @return RestartStrategy instance * @throws Exception which indicates that the RestartStrategy could not be instantiated. */ - public static RestartStrategy createFromConfig(Configuration configuration) throws Exception { + public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception { String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase(); switch (restartStrategyName) { @@ -92,16 +100,16 @@ public class RestartStrategyFactory { } if (numberExecutionRetries > 0 && delay >= 0) { - return new FixedDelayRestartStrategy(numberExecutionRetries, delay); + return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay); } else { - return NoRestartStrategy.create(configuration); + return NoRestartStrategy.createFactory(configuration); } case "off": case "disable": - return NoRestartStrategy.create(configuration); + return NoRestartStrategy.createFactory(configuration); case "fixeddelay": case "fixed-delay": - return FixedDelayRestartStrategy.create(configuration); + return FixedDelayRestartStrategy.createFactory(configuration); default: try { Class<?> clazz = Class.forName(restartStrategyName); @@ -113,7 +121,7 @@ public class RestartStrategyFactory { Object result = method.invoke(null, configuration); if (result != null) { - return (RestartStrategy) result; + return (RestartStrategyFactory) result; } } } @@ -128,7 +136,7 @@ public class RestartStrategyFactory { } // fallback in case of an error - return NoRestartStrategy.create(configuration); + return NoRestartStrategy.createFactory(configuration); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9f0482e..d8b8a01 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -28,6 +28,7 @@ import akka.actor._ import akka.pattern.ask import grizzled.slf4j.Logger +import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} @@ -118,7 +119,7 @@ class JobManager( protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, - protected val defaultRestartStrategy: RestartStrategy, + protected val restartStrategyFactory: RestartStrategyFactory, protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, @@ -210,7 +211,7 @@ class JobManager( log.info(s"Stopping JobManager $getAddress.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("The JobManager is shutting down."), + new SuppressRestartsException(new Exception("The JobManager is shutting down.")), removeJobFromStateBackend = true) implicit val executionContext = context.dispatcher @@ -307,7 +308,7 @@ class JobManager( log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") val newFuturesToComplete = cancelAndClearEverything( - new Exception("JobManager is no longer the leader."), + new SuppressRestartsException(new Exception("JobManager is no longer the leader.")), removeJobFromStateBackend = false) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) @@ -1071,7 +1072,7 @@ class JobManager( val restartStrategy = Option(jobGraph.getExecutionConfig().getRestartStrategy()) .map(RestartStrategyFactory.createRestartStrategy(_)) match { case Some(strategy) => strategy - case None => defaultRestartStrategy + case None => restartStrategyFactory.createRestartStrategy() } log.info(s"Using restart strategy $restartStrategy for $jobId.") @@ -1629,7 +1630,7 @@ class JobManager( * @param cause Cause for the cancelling. */ private def cancelAndClearEverything( - cause: Throwable, + cause: SuppressRestartsException, removeJobFromStateBackend: Boolean) : Seq[Future[Unit]] = { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { @@ -2265,7 +2266,7 @@ object JobManager { InstanceManager, FlinkScheduler, BlobLibraryCacheManager, - RestartStrategy, + RestartStrategyFactory, FiniteDuration, // timeout Int, // number of archived jobs LeaderElectionService, @@ -2281,8 +2282,7 @@ object JobManager { ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val restartStrategy = RestartStrategyFactory - .createFromConfig(configuration) + val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8e1b751..1c7815c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -41,7 +41,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.blob.{BlobKey, BlobClient, BlobCache, BlobService} +import org.apache.flink.runtime.blob.{BlobClient, BlobCache, BlobService} import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor} import org.apache.flink.runtime.execution.ExecutionState http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index fe35c0d..afc46a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { new Scheduler(TestingUtils.defaultExecutionContext()), new BlobLibraryCacheManager(new BlobServer(configuration), 10L), ActorRef.noSender(), - new NoRestartStrategy(), + new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeout(), leaderElectionService, submittedJobGraphStore, http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java new file mode 100644 index 0000000..a2cefb6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import akka.actor.ActorRef; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.messages.ExecutionGraphMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.util.TestLogger; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.Promise; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; + +public class LeaderChangeJobRecoveryTest extends TestLogger { + + private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS); + + private int numTMs = 1; + private int numSlotsPerTM = 1; + private int parallelism = numTMs * numSlotsPerTM; + + private Configuration configuration; + private LeaderElectionRetrievalTestingCluster cluster = null; + private JobGraph job = createBlockingJob(parallelism); + + @Before + public void before() throws TimeoutException, InterruptedException { + Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); + + configuration = new Configuration(); + + configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1); + configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); + + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100)); + cluster.start(false); + + // wait for actors to be alive so that they have started their leader retrieval service + cluster.waitForActorsToBeAlive(); + } + + /** + * Tests that the job is not restarted or at least terminates eventually in case that the + * JobManager loses its leadership. + * + * @throws Exception + */ + @Test + public void testNotRestartedWhenLosingLeadership() throws Exception { + UUID leaderSessionID = UUID.randomUUID(); + + cluster.grantLeadership(0, leaderSessionID); + cluster.notifyRetrievalListeners(0, leaderSessionID); + + cluster.waitForTaskManagersToBeRegistered(timeout); + + cluster.submitJobDetached(job); + + ActorGateway jm = cluster.getLeaderGateway(timeout); + + Future<Object> wait = jm.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout); + + Await.ready(wait, timeout); + + Future<Object> futureExecutionGraph = jm.ask(new TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout); + + TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph = + (TestingJobManagerMessages.ResponseExecutionGraph) Await.result(futureExecutionGraph, timeout); + + assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound); + + ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph(); + + TestActorGateway testActorGateway = new TestActorGateway(); + + executionGraph.registerJobStatusListener(testActorGateway); + + cluster.revokeLeadership(); + + Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState(); + + assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout)); + } + + public JobGraph createBlockingJob(int parallelism) { + Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); + + JobVertex sender = new JobVertex("sender"); + JobVertex receiver = new JobVertex("receiver"); + + sender.setInvokableClass(Tasks.Sender.class); + receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class); + + sender.setParallelism(parallelism); + receiver.setParallelism(parallelism); + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); + + SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + sender.setSlotSharingGroup(slotSharingGroup); + receiver.setSlotSharingGroup(slotSharingGroup); + + ExecutionConfig executionConfig = new ExecutionConfig(); + + return new JobGraph("Blocking test job", executionConfig, sender, receiver); + } + + public static class TestActorGateway implements ActorGateway { + + private static final long serialVersionUID = -736146686160538227L; + private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + public Future<Boolean> hasReachedTerminalState() { + return terminalState.future(); + } + + @Override + public Future<Object> ask(Object message, FiniteDuration timeout) { + return null; + } + + @Override + public void tell(Object message) { + this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null)); + } + + @Override + public void tell(Object message, ActorGateway sender) { + if (message instanceof ExecutionGraphMessages.JobStatusChanged) { + ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message; + + if (jobStatusChanged.newJobStatus().isTerminalState()) { + terminalState.success(true); + } + } + } + + @Override + public void forward(Object message, ActorGateway sender) { + + } + + @Override + public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { + return null; + } + + @Override + public String path() { + return null; + } + + @Override + public ActorRef actor() { + return null; + } + + @Override + public UUID leaderSessionID() { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index f14d62f..6d938ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -68,7 +68,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null); cluster.start(false); // TaskManagers don't have to register at the JobManager cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index c8cf868..cd89fa6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingCluster; import scala.Option; @@ -38,6 +39,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { private final Configuration userConfiguration; private final boolean useSingleActorSystem; + private final RestartStrategy restartStrategy; public List<TestingLeaderElectionService> leaderElectionServices; public List<TestingLeaderRetrievalService> leaderRetrievalServices; @@ -47,7 +49,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { public LeaderElectionRetrievalTestingCluster( Configuration userConfiguration, boolean singleActorSystem, - boolean synchronousDispatcher) { + boolean synchronousDispatcher, + RestartStrategy restartStrategy) { super(userConfiguration, singleActorSystem, synchronousDispatcher); this.userConfiguration = userConfiguration; @@ -55,6 +58,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { leaderElectionServices = new ArrayList<TestingLeaderElectionService>(); leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>(); + + this.restartStrategy = restartStrategy; } @Override @@ -90,6 +95,15 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER); } + @Override + public RestartStrategy getRestartStrategy(RestartStrategy other) { + if (this.restartStrategy != null) { + return this.restartStrategy; + } else { + return other; + } + } + public void grantLeadership(int index, UUID leaderSessionID) { if(leaderIndex >= 0) { // first revoke leadership @@ -109,4 +123,11 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { service.notifyListener(address, leaderSessionID); } } + + public void revokeLeadership() { + if (leaderIndex >= 0) { + leaderElectionServices.get(leaderIndex).notLeader(); + leaderIndex = -1; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 5b08a45..0c4ffb9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -26,11 +26,11 @@ import akka.testkit.CallingThreadDispatcher import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.minicluster.FlinkMiniCluster import org.apache.flink.runtime.testutils.TestingResourceManager -import org.apache.flink.util.NetUtils import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingMessages.Alive @@ -100,7 +100,7 @@ class TestingCluster( instanceManager, scheduler, libraryCacheManager, - restartStrategy, + restartStrategyFactory, timeout, archiveCount, leaderElectionService, @@ -122,7 +122,7 @@ class TestingCluster( scheduler, libraryCacheManager, archive, - restartStrategy, + restartStrategyFactory, timeout, leaderElectionService, submittedJobsGraphs, @@ -186,6 +186,10 @@ class TestingCluster( None } + def getRestartStrategy(restartStrategy: RestartStrategy) = { + restartStrategy + } + @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) def waitForTaskManagersToBeAlive(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 53867e0..e854b13 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -23,7 +23,7 @@ import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} @@ -44,7 +44,7 @@ class TestingJobManager( scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategy: RestartStrategy, + restartStrategyFactory: RestartStrategyFactory, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -58,7 +58,7 @@ class TestingJobManager( scheduler, libraryCacheManager, archive, - restartStrategy, + restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphs, http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index a6289d4..2f93785 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -24,7 +24,7 @@ import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.Scheduler @@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration * @param scheduler Scheduler to schedule Flink jobs * @param libraryCacheManager Manager to manage uploaded jar files * @param archive Archive for finished Flink jobs - * @param restartStrategy Default restart strategy for job restarts + * @param restartStrategyFactory Default restart strategy for job restarts * @param timeout Timeout for futures * @param leaderElectionService LeaderElectionService to participate in the leader election */ @@ -57,7 +57,7 @@ class TestingYarnJobManager( scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategy: RestartStrategy, + restartStrategyFactory: RestartStrategyFactory, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -71,7 +71,7 @@ class TestingYarnJobManager( scheduler, libraryCacheManager, archive, - restartStrategy, + restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphs, http://git-wip-us.apache.org/repos/asf/flink/blob/28c57c3a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 80520a1..f291c02 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.clusterframework.ApplicationStatus -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} @@ -53,7 +53,7 @@ import scala.language.postfixOps * @param scheduler Scheduler to schedule Flink jobs * @param libraryCacheManager Manager to manage uploaded jar files * @param archive Archive for finished Flink jobs - * @param restartStrategy Restart strategy to be used in case of a job recovery + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery * @param timeout Timeout for futures * @param leaderElectionService LeaderElectionService to participate in the leader election */ @@ -64,7 +64,7 @@ class YarnJobManager( scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategy: RestartStrategy, + restartStrategyFactory: RestartStrategyFactory, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -78,7 +78,7 @@ class YarnJobManager( scheduler, libraryCacheManager, archive, - restartStrategy, + restartStrategyFactory, timeout, leaderElectionService, submittedJobGraphs,