[FLINK-2619] [tests] Fix failing ExecutionGraphRestartTest and JobManagerRegistrationTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d035ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d035ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d035ab Branch: refs/heads/master Commit: 95d035ab363d13a53ec37894678c9a6a4896e9dd Parents: c9edd9a Author: Stephan Ewen <se...@apache.org> Authored: Wed Sep 9 16:00:25 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Sep 9 19:10:51 2015 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 31 +-- .../executiongraph/ExecutionGraphTestUtils.java | 4 + .../ExecutionGraphRestartTest.scala | 34 ++-- .../jobmanager/JobManagerRegistrationTest.scala | 187 +++++++++---------- 4 files changed, 138 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index cde1741..a44fc82 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -857,20 +857,25 @@ public class ExecutionGraph implements Serializable { else if (current == JobStatus.FAILING) { if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) { numberOfRetriesLeft--; - future(new Callable<Object>() { - @Override - public Object call() throws Exception { - try { - LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying); - Thread.sleep(delayBeforeRetrying); + + if (delayBeforeRetrying > 0) { + future(new Callable<Object>() { + @Override + public Object call() throws Exception { + try { + LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying); + Thread.sleep(delayBeforeRetrying); + } + catch(InterruptedException e){ + // should only happen on shutdown + } + restart(); + return null; } - catch(InterruptedException e){ - // should only happen on shutdown - } - restart(); - return null; - } - }, executionContext); + }, executionContext); + } else { + restart(); + } break; } else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 64d4c44..ad30b9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -112,7 +112,9 @@ public class ExecutionGraphTestUtils { return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots); } + @SuppressWarnings("serial") public static class SimpleActorGateway extends BaseTestingActorGateway { + public TaskDeploymentDescriptor lastTDD; public SimpleActorGateway(ExecutionContext executionContext){ @@ -139,7 +141,9 @@ public class ExecutionGraphTestUtils { } } + @SuppressWarnings("serial") public static class SimpleFailingActorGateway extends BaseTestingActorGateway { + public SimpleFailingActorGateway(ExecutionContext executionContext) { super(executionContext); } http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index 434a8cb..d1b8fac 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -26,10 +26,13 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils + import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{Matchers, WordSpecLike} +import scala.collection.JavaConverters._ + @RunWith(classOf[JUnitRunner]) class ExecutionGraphRestartTest extends WordSpecLike with Matchers { @@ -39,7 +42,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { "be manually restartable" in { try { val instance = ExecutionGraphTestUtils.getInstance( - new SimpleActorGateway(TestingUtils.directExecutionContext)) + new SimpleActorGateway(TestingUtils.directExecutionContext), + NUM_TASKS) val scheduler = new Scheduler(TestingUtils.defaultExecutionContext) scheduler.newInstanceAvailable(instance) @@ -65,14 +69,18 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { eg.getState should equal(JobStatus.RUNNING) eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception")) + + for (vertex <- eg.getAllExecutionVertices().asScala) { + vertex.getCurrentExecutionAttempt().cancelingComplete() + } + eg.getState should equal(JobStatus.FAILED) eg.restart() eg.getState should equal(JobStatus.RUNNING) - - import collection.JavaConverters._ + for (vertex <- eg.getAllExecutionVertices.asScala) { - vertex.executionFinished() + vertex.getCurrentExecutionAttempt().markFinished() } eg.getState should equal(JobStatus.FINISHED) @@ -86,7 +94,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { "restart itself automatically" in { try { val instance = ExecutionGraphTestUtils.getInstance( - new SimpleActorGateway(TestingUtils.directExecutionContext)) + new SimpleActorGateway(TestingUtils.directExecutionContext), + NUM_TASKS) val scheduler = new Scheduler(TestingUtils.defaultExecutionContext) scheduler.newInstanceAvailable(instance) @@ -112,15 +121,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { eg.getState should equal(JobStatus.RUNNING) eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception")) - + eg.getState should equal(JobStatus.FAILING) + + for (vertex <- eg.getAllExecutionVertices.asScala) { + vertex.getCurrentExecutionAttempt().cancelingComplete() + } + eg.getState should equal(JobStatus.RUNNING) - - import collection.JavaConverters._ + for (vertex <- eg.getAllExecutionVertices.asScala) { - vertex.executionFinished() + vertex.getCurrentExecutionAttempt().markFinished() } - eg.getState should equal(JobStatus.FINISHED) + eg.getState() should equal(JobStatus.FINISHED) } catch { case t: Throwable => t.printStackTrace() @@ -128,5 +141,4 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers { } } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 7487670..ea691f1 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -19,18 +19,21 @@ package org.apache.flink.runtime.jobmanager import java.net.InetAddress -import java.util.UUID import akka.actor._ import akka.testkit.{ImplicitSender, TestKit} + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} +import org.apache.flink.runtime.instance._ +import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} + import org.junit.Assert.{assertNotEquals, assertNotNull} import org.junit.runner.RunWith + import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -56,123 +59,119 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "assign a TaskManager a unique instance ID" in { val jm = startTestingJobManager(_system) - - val tmDummy1 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) - val tmDummy2 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) - - try { - val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000) - val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001) - - val hardwareDescription = HardwareDescription.extractFromSystem(10) - - val leaderSessionID = UUID.randomUUID() - - var id1: InstanceID = null - var id2: InstanceID = null - - // task manager 1 - within(1 second) { - jm.tell( - RegisterTaskManager( - connectionInfo1, - hardwareDescription, - 1), - tmDummy1) - - val response = receiveOne(1 second) - response match { - case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id1 = id - case _ => fail("Wrong response message: " + response) - } + + val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor))) + val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor))) + + val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000) + val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001) + + val hardwareDescription = HardwareDescription.extractFromSystem(10) + + var id1: InstanceID = null + var id2: InstanceID = null + + // task manager 1 + within(1 second) { + jm.tell( + RegisterTaskManager( + connectionInfo1, + hardwareDescription, + 1), + new AkkaActorGateway(tm1, null)) + + val response = expectMsgType[LeaderSessionMessage] + response match { + case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id + case _ => fail("Wrong response message: " + response) } + } - // task manager 2 - within(1 second) { - jm.tell( - RegisterTaskManager( - connectionInfo2, - hardwareDescription, - 1), - tmDummy2) - - val response = receiveOne(1 second) - response match { - case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id - case _ => fail("Wrong response message: " + response) - } + // task manager 2 + within(1 second) { + jm.tell( + RegisterTaskManager( + connectionInfo2, + hardwareDescription, + 1), + new AkkaActorGateway(tm2, null)) + + val response = expectMsgType[LeaderSessionMessage] + response match { + case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id + case _ => fail("Wrong response message: " + response) } - - assertNotNull(id1) - assertNotNull(id2) - assertNotEquals(id1, id2) - } - finally { - tmDummy1 ! Kill - tmDummy2 ! Kill - jm ! Kill } + + assertNotNull(id1) + assertNotNull(id2) + assertNotEquals(id1, id2) } "handle repeated registration calls" in { val jm = startTestingJobManager(_system) - val tmDummy = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor])) - - try { - val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) - val hardwareDescription = HardwareDescription.extractFromSystem(10) - - within(1 second) { - jm.tell( - RegisterTaskManager( - connectionInfo, - hardwareDescription, - 1), - tmDummy) - - jm.tell( - RegisterTaskManager( - connectionInfo, - hardwareDescription, - 1), - tmDummy) - - jm.tell( - RegisterTaskManager( - connectionInfo, - hardwareDescription, - 1), - tmDummy) - - expectMsgType[AcknowledgeRegistration] - expectMsgType[AlreadyRegistered] - expectMsgType[AlreadyRegistered] + val selfGateway = new AkkaActorGateway(testActor, null) + + val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1) + val hardwareDescription = HardwareDescription.extractFromSystem(10) + + within(5 second) { + jm.tell( + RegisterTaskManager( + connectionInfo, + hardwareDescription, + 1), + selfGateway) + + jm.tell( + RegisterTaskManager( + connectionInfo, + hardwareDescription, + 1), + selfGateway) + + jm.tell( + RegisterTaskManager( + connectionInfo, + hardwareDescription, + 1), + selfGateway) + + expectMsgType[LeaderSessionMessage] match { + case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) => + case m => fail("Wrong message type: " + m) + } + + expectMsgType[LeaderSessionMessage] match { + case LeaderSessionMessage(null, AlreadyRegistered(_, _)) => + case m => fail("Wrong message type: " + m) + } + + expectMsgType[LeaderSessionMessage] match { + case LeaderSessionMessage(null, AlreadyRegistered(_, _)) => + case m => fail("Wrong message type: " + m) } - } finally { - tmDummy ! Kill - jm ! Kill } } } - private def startTestingJobManager(system: ActorSystem): ActorRef = { + private def startTestingJobManager(system: ActorSystem): ActorGateway = { val (jm: ActorRef, _) = JobManager.startJobManagerActors( new Configuration(), _system, None, None, StreamingMode.BATCH_ONLY) - jm + new AkkaActorGateway(jm, null) } } object JobManagerRegistrationTest { - - /** Simply dummy actor that swallows all messages */ - class DummyActor extends Actor { + + class PlainForwardingActor(private val target: ActorRef) extends Actor { override def receive: Receive = { - case _ => + case message => target.forward(message) } } }