[FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph.
This closes #1613 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6968a57a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6968a57a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6968a57a Branch: refs/heads/tableOnCalcite Commit: 6968a57a1a31a11b33bacd2c94d6559bcabd6eb9 Parents: 48b7454 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Feb 9 10:30:12 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 10 15:34:37 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 14 +- .../ExecutionGraphRestartTest.java | 90 +++++++++++++ .../runtime/testingUtils/TestingCluster.scala | 6 +- .../testingUtils/TestingTaskManagerLike.scala | 4 +- .../runtime/testingUtils/TestingUtils.scala | 133 ++++++++++++++++++- 5 files changed, 233 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index eb2e68c..db037bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -435,7 +435,7 @@ public class Execution implements Serializable { return; } else if (current == CREATED || current == SCHEDULED) { - // from here, we can directly switch to cancelled, because the no task has been deployed + // from here, we can directly switch to cancelled, because no task has been deployed if (transitionState(current, CANCELED)) { // we skip the canceling state. set the timestamp, for a consistent appearance @@ -754,11 +754,10 @@ public class Execution implements Serializable { return false; } - if (current == CANCELED) { - // we are already aborting or are already aborted + if (current == CANCELED || current == FINISHED) { + // we are already aborting or are already aborted or we are already finished if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", - getVertexWithAttempt(), FAILED, CANCELED)); + LOG.debug("Ignoring transition of vertex {} to {} while being {}.", getVertexWithAttempt(), FAILED, current); } return false; } @@ -928,6 +927,11 @@ public class Execution implements Serializable { } private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { + // sanity check + if (currentState.isTerminal()) { + throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + "."); + } + if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 0c3af8f..47a48a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; @@ -387,6 +388,95 @@ public class ExecutionGraphRestartTest { assertEquals(1, eg.getNumberOfRetriesLeft()); } + /** + * Tests that a failing execution does not affect a restarted job. This is important if a + * callback handler fails an execution after it has already reached a final state and the job + * has been restarted. + */ + @Test + public void testFailingExecutionAfterRestart() throws Exception { + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + 2); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex sender = new JobVertex("Task1"); + sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setParallelism(1); + + JobVertex receiver = new JobVertex("Task2"); + receiver.setInvokableClass(Tasks.NoOpInvokable.class); + receiver.setParallelism(1); + + JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "test job", + new Configuration(), + AkkaUtils.getDefaultTimeout()); + eg.setNumberOfRetriesLeft(1); + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + assertEquals(JobStatus.RUNNING, eg.getState()); + + Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator(); + + Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt(); + Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt(); + + finishedExecution.markFinished(); + failedExecution.fail(new Exception("Test Exception")); + + failedExecution.cancelingComplete(); + + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); + + Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) { + Thread.sleep(100); + } + + assertEquals(JobStatus.RUNNING, eg.getState()); + + // Wait for deploying after async restart + deadline = timeout.fromNow(); + boolean success = false; + + while (deadline.hasTimeLeft() && !success) { + success = true; + + for (ExecutionVertex vertex : eg.getAllExecutionVertices()) { + if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) { + success = false; + Thread.sleep(100); + break; + } else { + vertex.getCurrentExecutionAttempt().switchToRunning(); + } + } + } + + // fail old finished execution, this should not affect the execution + finishedExecution.fail(new Exception("This should have no effect")); + + for (ExecutionVertex vertex: eg.getAllExecutionVertices()) { + vertex.getCurrentExecutionAttempt().markFinished(); + } + + // the state of the finished execution should have not changed since it is terminal + assertEquals(ExecutionState.FINISHED, finishedExecution.getState()); + + assertEquals(JobStatus.FINISHED, eg.getState()); + } + private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception")); http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index bd56040..22b0d29 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -42,9 +42,9 @@ import scala.concurrent.{Await, Future} * otherwise false */ class TestingCluster( - userConfiguration: Configuration, - singleActorSystem: Boolean, - synchronousDispatcher: Boolean) + userConfiguration: Configuration, + singleActorSystem: Boolean, + synchronousDispatcher: Boolean) extends FlinkMiniCluster( userConfiguration, singleActorSystem) { http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala index c10e83e..e9dbdde 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala @@ -21,7 +21,6 @@ package org.apache.flink.runtime.testingUtils import akka.actor.{Terminated, ActorRef} import org.apache.flink.api.common.JobID import org.apache.flink.runtime.FlinkActor -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID, @@ -32,8 +31,7 @@ AcknowledgeRegistration} import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, UpdateTaskExecutionState, TaskInFinalState} import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved -import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, -CheckIfJobRemoved, Alive} +import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect, CheckIfJobRemoved, Alive} import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 98faa34..679dc71 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -26,8 +26,11 @@ import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger +import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.client.JobClient +import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor} import org.apache.flink.runtime.akka.AkkaUtils @@ -168,6 +171,26 @@ object TestingUtils { } def createTaskManager( + actorSystem: ActorSystem, + jobManager: ActorGateway, + configuration: Configuration, + useLocalCommunication: Boolean, + waitForRegistration: Boolean, + taskManagerClass: Class[_ <: TaskManager]) + : ActorGateway = { + val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor) + + createTaskManager( + actorSystem, + jobManagerURL, + configuration, + useLocalCommunication, + waitForRegistration, + taskManagerClass + ) + } + + def createTaskManager( actorSystem: ActorSystem, jobManager: ActorGateway, configuration: Configuration, @@ -200,11 +223,30 @@ object TestingUtils { * @return ActorGateway of the created TaskManager */ def createTaskManager( + actorSystem: ActorSystem, + jobManagerURL: String, + configuration: Configuration, + useLocalCommunication: Boolean, + waitForRegistration: Boolean) + : ActorGateway = { + createTaskManager( + actorSystem, + jobManagerURL, + configuration, + useLocalCommunication, + waitForRegistration, + classOf[TestingTaskManager] + ) + } + + + def createTaskManager( actorSystem: ActorSystem, jobManagerURL: String, configuration: Configuration, useLocalCommunication: Boolean, - waitForRegistration: Boolean) + waitForRegistration: Boolean, + taskManagerClass: Class[_ <: TaskManager]) : ActorGateway = { val resultingConfiguration = new Configuration() @@ -222,7 +264,7 @@ object TestingUtils { None, leaderRetrievalService, useLocalCommunication, - classOf[TestingTaskManager] + taskManagerClass ) if (waitForRegistration) { @@ -264,6 +306,72 @@ object TestingUtils { actorSystem: ActorSystem, configuration: Configuration) : ActorGateway = { + createJobManager( + actorSystem, + configuration, + classOf[TestingJobManager] + ) + } + + def createJobManager( + actorSystem: ActorSystem, + configuration: Configuration, + executionContext: ExecutionContext) + : ActorGateway = { + + val (_, + instanceManager, + scheduler, + libraryCacheManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) = JobManager.createJobManagerComponents( + configuration, + None + ) + + val archiveProps = Props(classOf[TestingMemoryArchivist], archiveCount) + + val archive: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME) + + val jobManagerProps = Props( + classOf[TestingJobManager], + configuration, + executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archive, + executionRetries, + delayBetweenRetries, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) + + val jobManager: ActorRef = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) + + new AkkaActorGateway(jobManager, null) + } + + + /** + * Creates a JobManager of the given class using the default recovery mode (standalone) + * + * @param actorSystem ActorSystem to use + * @param configuration Configuration to use + * @param jobManagerClass JobManager class to instantiate + * @return + */ + def createJobManager( + actorSystem: ActorSystem, + configuration: Configuration, + jobManagerClass: Class[_ <: JobManager]) + : ActorGateway = { configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE) @@ -272,7 +380,7 @@ object TestingUtils { actorSystem, Some(JobManager.JOB_MANAGER_NAME), Some(JobManager.ARCHIVE_NAME), - classOf[TestingJobManager], + jobManagerClass, classOf[MemoryArchivist]) new AkkaActorGateway(actor, null) @@ -312,6 +420,25 @@ object TestingUtils { new AkkaActorGateway(actor, null) } + def submitJobAndWait( + actorSystem: ActorSystem, + jobManager: ActorGateway, + jobGraph: JobGraph) + : JobExecutionResult = { + + val jobManagerURL = AkkaUtils.getAkkaURL(actorSystem, jobManager.actor) + val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerURL) + + JobClient.submitJobAndWait( + actorSystem, + leaderRetrievalService, + jobGraph, + TESTING_DURATION, + false, + Thread.currentThread().getContextClassLoader + ) + } + class ForwardingActor(val target: ActorRef, val leaderSessionID: Option[UUID]) extends FlinkActor with LeaderSessionMessageFilter with LogMessages {