[FLINK-2619] [tests] Fix failing ExecutionGraphRestartTest and 
JobManagerRegistrationTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d035ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d035ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d035ab

Branch: refs/heads/master
Commit: 95d035ab363d13a53ec37894678c9a6a4896e9dd
Parents: c9edd9a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 9 16:00:25 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 9 19:10:51 2015 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  31 +--
 .../executiongraph/ExecutionGraphTestUtils.java |   4 +
 .../ExecutionGraphRestartTest.scala             |  34 ++--
 .../jobmanager/JobManagerRegistrationTest.scala | 187 +++++++++----------
 4 files changed, 138 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cde1741..a44fc82 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -857,20 +857,25 @@ public class ExecutionGraph implements Serializable {
                                        else if (current == JobStatus.FAILING) {
                                                if (numberOfRetriesLeft > 0 && 
transitionState(current, JobStatus.RESTARTING)) {
                                                        numberOfRetriesLeft--;
-                                                       future(new 
Callable<Object>() {
-                                                               @Override
-                                                               public Object 
call() throws Exception {
-                                                                       try {
-                                                                               
LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying);
-                                                                               
Thread.sleep(delayBeforeRetrying);
+                                                       
+                                                       if (delayBeforeRetrying 
> 0) {
+                                                               future(new 
Callable<Object>() {
+                                                                       
@Override
+                                                                       public 
Object call() throws Exception {
+                                                                               
try {
+                                                                               
        LOG.info("Delaying retry of job execution for {} ms ...", 
delayBeforeRetrying);
+                                                                               
        Thread.sleep(delayBeforeRetrying);
+                                                                               
}
+                                                                               
catch(InterruptedException e){
+                                                                               
        // should only happen on shutdown
+                                                                               
}
+                                                                               
restart();
+                                                                               
return null;
                                                                        }
-                                                                       
catch(InterruptedException e){
-                                                                               
// should only happen on shutdown
-                                                                       }
-                                                                       
restart();
-                                                                       return 
null;
-                                                               }
-                                                       }, executionContext);
+                                                               }, 
executionContext);
+                                                       } else {
+                                                               restart();
+                                                       }
                                                        break;
                                                }
                                                else if (numberOfRetriesLeft <= 
0 && transitionState(current, JobStatus.FAILED, failureCause)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 64d4c44..ad30b9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -112,7 +112,9 @@ public class ExecutionGraphTestUtils {
                return new Instance(gateway, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
        }
 
+       @SuppressWarnings("serial")
        public static class SimpleActorGateway extends BaseTestingActorGateway {
+               
                public TaskDeploymentDescriptor lastTDD;
 
                public SimpleActorGateway(ExecutionContext executionContext){
@@ -139,7 +141,9 @@ public class ExecutionGraphTestUtils {
                }
        }
 
+       @SuppressWarnings("serial")
        public static class SimpleFailingActorGateway extends 
BaseTestingActorGateway {
+
                public SimpleFailingActorGateway(ExecutionContext 
executionContext) {
                        super(executionContext);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 434a8cb..d1b8fac 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -26,10 +26,13 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, 
JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
 
@@ -39,7 +42,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
     "be manually restartable" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext))
+          new SimpleActorGateway(TestingUtils.directExecutionContext),
+            NUM_TASKS)
 
         val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
@@ -65,14 +69,18 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test 
Exception"))
+        
+        for (vertex <- eg.getAllExecutionVertices().asScala) {
+          vertex.getCurrentExecutionAttempt().cancelingComplete()
+        }
+        
         eg.getState should equal(JobStatus.FAILED)
 
         eg.restart()
         eg.getState should equal(JobStatus.RUNNING)
-
-        import collection.JavaConverters._
+        
         for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.executionFinished()
+          vertex.getCurrentExecutionAttempt().markFinished()
         }
 
         eg.getState should equal(JobStatus.FINISHED)
@@ -86,7 +94,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
     "restart itself automatically" in {
       try {
         val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext))
+          new SimpleActorGateway(TestingUtils.directExecutionContext),
+          NUM_TASKS)
 
         val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
         scheduler.newInstanceAvailable(instance)
@@ -112,15 +121,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
         eg.getState should equal(JobStatus.RUNNING)
 
         eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test 
Exception"))
-
+        eg.getState should equal(JobStatus.FAILING)
+        
+        for (vertex <- eg.getAllExecutionVertices.asScala) {
+          vertex.getCurrentExecutionAttempt().cancelingComplete()
+        }
+        
         eg.getState should equal(JobStatus.RUNNING)
-
-        import collection.JavaConverters._
+        
         for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.executionFinished()
+          vertex.getCurrentExecutionAttempt().markFinished()
         }
 
-        eg.getState should equal(JobStatus.FINISHED)
+        eg.getState() should equal(JobStatus.FINISHED)
       } catch {
         case t: Throwable =>
           t.printStackTrace()
@@ -128,5 +141,4 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/95d035ab/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 7487670..ea691f1 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -19,18 +19,21 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.net.InetAddress
-import java.util.UUID
 
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit}
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance._
+import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
+
 import org.junit.Assert.{assertNotEquals, assertNotNull}
 import org.junit.runner.RunWith
+
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
@@ -56,123 +59,119 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
 
     "assign a TaskManager a unique instance ID" in {
       val jm = startTestingJobManager(_system)
-
-      val tmDummy1 = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-      val tmDummy2 = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo1 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
-        val connectionInfo2 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
-
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-        val leaderSessionID = UUID.randomUUID()
-
-        var id1: InstanceID = null
-        var id2: InstanceID = null
-
-        // task manager 1
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo1,
-              hardwareDescription,
-              1),
-            tmDummy1)
-
-          val response = receiveOne(1 second)
-          response match {
-            case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id1 = id
-            case _ => fail("Wrong response message: " + response)
-          }
+      
+      val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
+      val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
+      
+      val connectionInfo1 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
+      val connectionInfo2 = new 
InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
+
+      val hardwareDescription = HardwareDescription.extractFromSystem(10)
+      
+      var id1: InstanceID = null
+      var id2: InstanceID = null
+
+      // task manager 1
+      within(1 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo1,
+            hardwareDescription,
+            1),
+          new AkkaActorGateway(tm1, null))
+
+        val response = expectMsgType[LeaderSessionMessage]
+        response match {
+          case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 
= id
+          case _ => fail("Wrong response message: " + response)
         }
+      }
 
-        // task manager 2
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo2,
-              hardwareDescription,
-              1),
-            tmDummy2)
-
-          val response = receiveOne(1 second)
-          response match {
-            case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id2 = id
-            case _ => fail("Wrong response message: " + response)
-          }
+      // task manager 2
+      within(1 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo2,
+            hardwareDescription,
+            1),
+          new AkkaActorGateway(tm2, null))
+
+        val response = expectMsgType[LeaderSessionMessage]
+        response match {
+          case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id2 = id
+          case _ => fail("Wrong response message: " + response)
         }
-
-        assertNotNull(id1)
-        assertNotNull(id2)
-        assertNotEquals(id1, id2)
-      }
-      finally {
-        tmDummy1 ! Kill
-        tmDummy2 ! Kill
-        jm ! Kill
       }
+
+      assertNotNull(id1)
+      assertNotNull(id2)
+      assertNotEquals(id1, id2)
     }
 
     "handle repeated registration calls" in {
 
       val jm = startTestingJobManager(_system)
-      val tmDummy = 
_system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
-
-      try {
-        val connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost,1)
-        val hardwareDescription = HardwareDescription.extractFromSystem(10)
-
-        within(1 second) {
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          jm.tell(
-            RegisterTaskManager(
-              connectionInfo,
-              hardwareDescription,
-              1),
-            tmDummy)
-
-          expectMsgType[AcknowledgeRegistration]
-          expectMsgType[AlreadyRegistered]
-          expectMsgType[AlreadyRegistered]
+      val selfGateway = new AkkaActorGateway(testActor, null)
+      
+      val connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost,1)
+      val hardwareDescription = HardwareDescription.extractFromSystem(10)
+
+      within(5 second) {
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        jm.tell(
+          RegisterTaskManager(
+            connectionInfo,
+            hardwareDescription,
+            1),
+          selfGateway)
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) =>
+          case m => fail("Wrong message type: " + m)
+        }
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case m => fail("Wrong message type: " + m)
+        }
+
+        expectMsgType[LeaderSessionMessage] match {
+          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case m => fail("Wrong message type: " + m)
         }
-      } finally {
-        tmDummy ! Kill
-        jm ! Kill
       }
     }
   }
 
-  private def startTestingJobManager(system: ActorSystem): ActorRef = {
+  private def startTestingJobManager(system: ActorSystem): ActorGateway = {
     val (jm: ActorRef, _) = JobManager.startJobManagerActors(
       new Configuration(),
       _system,
       None,
       None,
       StreamingMode.BATCH_ONLY)
-    jm
+    new AkkaActorGateway(jm, null)
   }
 }
 
 object JobManagerRegistrationTest {
-
-  /** Simply dummy actor that swallows all messages */
-  class DummyActor extends Actor {
+  
+  class PlainForwardingActor(private val target: ActorRef) extends Actor {
     override def receive: Receive = {
-      case _ =>
+      case message => target.forward(message)
     }
   }
 }

Reply via email to