http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index f987e07..3e90123 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -32,11 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.runtime.instance.SimpleSlot; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -45,21 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; * Tests for the scheduler when scheduling tasks in slot sharing groups. */ public class SchedulerSlotSharingTest { - private static ActorSystem system; - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - TestingUtils.setCallingThreadDispatcher(system); - } - - @AfterClass - public static void teardown(){ - TestingUtils.setGlobalExecutionContext(); - JavaTestKit.shutdownActorSystem(system); - } - - @Test public void scheduleSingleVertexType() { try { @@ -67,7 +49,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); scheduler.newInstanceAvailable(i1); @@ -154,7 +136,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(2)); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -274,7 +256,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(2)); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -339,7 +321,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(2)); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -450,7 +432,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 1 tasks from the first vertex group and 2 from the second @@ -502,7 +484,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(3)); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -649,7 +631,7 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); scheduler.newInstanceAvailable(i2); @@ -699,7 +681,7 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); scheduler.newInstanceAvailable(i2); @@ -749,7 +731,7 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); scheduler.newInstanceAvailable(i2); @@ -795,7 +777,6 @@ public class SchedulerSlotSharingTest { @Test public void testSequentialAllocateAndRelease() { - TestingUtils.setGlobalExecutionContext(); try { final JobVertexID jid1 = new JobVertexID(); final JobVertexID jid2 = new JobVertexID(); @@ -804,7 +785,7 @@ public class SchedulerSlotSharingTest { final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4); - final Scheduler scheduler = new Scheduler(); + final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(4)); // allocate something from group 1 and 2 interleaved with schedule for group 3 @@ -853,15 +834,13 @@ public class SchedulerSlotSharingTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - }finally{ - TestingUtils.setCallingThreadDispatcher(system); } } @Test public void testConcurrentAllocateAndRelease() { final ExecutorService executor = Executors.newFixedThreadPool(20); - TestingUtils.setGlobalExecutionContext(); + try { for (int run = 0; run < 50; run++) { final JobVertexID jid1 = new JobVertexID(); @@ -871,7 +850,7 @@ public class SchedulerSlotSharingTest { final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4); - final Scheduler scheduler = new Scheduler(); + final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(4)); final AtomicInteger enumerator1 = new AtomicInteger(); @@ -1030,10 +1009,6 @@ public class SchedulerSlotSharingTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - executor.shutdownNow(); - TestingUtils.setCallingThreadDispatcher(system); - } } @Test @@ -1046,7 +1021,7 @@ public class SchedulerSlotSharingTest { SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 694b88b..2de0635 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -29,9 +29,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.concurrent.atomic.AtomicInteger; -import akka.actor.ActorRef; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.DummyInstanceGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -66,7 +66,7 @@ public class SchedulerTestUtils { final long GB = 1024L*1024*1024; HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB); - return new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, numSlots); + return new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, numSlots); } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index a67cd00..442ddcf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import scala.Option; import scala.Tuple2; @@ -89,7 +90,10 @@ public class TaskManagerComponentsStartupShutdownTest { final MemoryManager memManager = new DefaultMemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, false); final IOManager ioManager = new IOManagerAsync(TMP_DIR); - final NetworkEnvironment network = new NetworkEnvironment(timeout, netConf); + final NetworkEnvironment network = new NetworkEnvironment( + TestingUtils.defaultExecutionContext(), + timeout, + netConf); final int numberOfSlots = 1; // create the task manager http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index a1ca43c..b3090e6 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -18,36 +18,27 @@ package org.apache.flink.runtime.executiongraph -import akka.actor.{Props, ActorSystem} -import akka.testkit.TestKit import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.scalatest.{Matchers, WordSpecLike} -class ExecutionGraphRestartTest(_system: ActorSystem) extends TestKit(_system) with WordSpecLike -with Matchers with BeforeAndAfterAll { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } +class ExecutionGraphRestartTest extends WordSpecLike with Matchers { val NUM_TASKS = 31 "The execution graph" must { "be manually restartable" in { try { - val tm = system.actorOf(Props(classOf[ExecutionGraphTestUtils - .SimpleAcknowledgingTaskManager], "TaskManager")) - val instance = ExecutionGraphTestUtils.getInstance(tm) + val instance = ExecutionGraphTestUtils.getInstance( + new SimpleInstanceGateway(TestingUtils.directExecutionContext)) - val scheduler = new Scheduler + val scheduler = new Scheduler(TestingUtils.defaultExecutionContext) scheduler.newInstanceAvailable(instance) val sender = new JobVertex("Task") @@ -56,7 +47,11 @@ with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) - val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), + val eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext, + new JobID(), + "test job", + new Configuration(), AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(0) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) @@ -87,11 +82,10 @@ with Matchers with BeforeAndAfterAll { "restart itself automatically" in { try { - val tm = system.actorOf(Props - (classOf[ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager], "TaskManager")) - val instance = ExecutionGraphTestUtils.getInstance(tm) + val instance = ExecutionGraphTestUtils.getInstance( + new SimpleInstanceGateway(TestingUtils.directExecutionContext)) - val scheduler = new Scheduler + val scheduler = new Scheduler(TestingUtils.defaultExecutionContext) scheduler.newInstanceAvailable(instance) val sender = new JobVertex("Task") @@ -100,7 +94,11 @@ with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) - val eg = new ExecutionGraph(new JobID(), "Test job", new Configuration(), + val eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext, + new JobID(), + "Test job", + new Configuration(), AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(1) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 13199bc..aaa0025 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -18,38 +18,27 @@ package org.apache.flink.runtime.executiongraph -import akka.actor.{Props, ActorSystem} -import akka.testkit.TestKit import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils -.SimpleAcknowledgingTaskManager +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleInstanceGateway import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import org.scalatest.{Matchers, WordSpecLike} -class TaskManagerLossFailsTasksTest(_system: ActorSystem) extends TestKit(_system) with -WordSpecLike with Matchers with BeforeAndAfterAll { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } +class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { "A task manager loss" must { "fail the assigned tasks" in { try { - val tm1 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager1")) - val tm2 = system.actorOf(Props(classOf[SimpleAcknowledgingTaskManager], "TaskManager2")) - - val instance1 = ExecutionGraphTestUtils.getInstance(tm1, 10) - val instance2 = ExecutionGraphTestUtils.getInstance(tm2, 10) + val instance1 = ExecutionGraphTestUtils.getInstance( + new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10) + val instance2 = ExecutionGraphTestUtils.getInstance( + new SimpleInstanceGateway(TestingUtils.defaultExecutionContext), 10) - val scheduler = new Scheduler + val scheduler = new Scheduler(TestingUtils.defaultExecutionContext) scheduler.newInstanceAvailable(instance1) scheduler.newInstanceAvailable(instance2) @@ -59,7 +48,11 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val jobGraph = new JobGraph("Pointwise job", sender) - val eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), + val eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext, + new JobID(), + "test job", + new Configuration(), AkkaUtils.getDefaultTimeout) eg.setNumberOfRetriesLeft(0) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index 766ea55..f7bf56a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -177,12 +177,12 @@ WordSpecLike with Matchers with BeforeAndAfterAll { jm ! NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING) jm ! RequestWorkingTaskManager(jobGraph.getJobID) - val WorkingTaskManager(tm) = expectMsgType[WorkingTaskManager] + val WorkingTaskManager(gatewayOption) = expectMsgType[WorkingTaskManager] - tm match { - case ActorRef.noSender => fail("There has to be at least one task manager on which" + + gatewayOption match { + case None => fail("There has to be at least one task manager on which" + "the tasks are running.") - case t => t ! PoisonPill + case Some(gateway) => gateway.tell(PoisonPill) } expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING)) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 11e93d6..219e5ae 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -60,16 +60,34 @@ class TestingCluster(userConfiguration: Configuration, override def startJobManager(actorSystem: ActorSystem): ActorRef = { - val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, - executionRetries, delayBetweenRetries, - timeout, archiveCount) = JobManager.createJobManagerComponents(configuration) + val (executionContext, + instanceManager, + scheduler, + libraryCacheManager, + _, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount) = JobManager.createJobManagerComponents(configuration) val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist) val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME) - val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler, - libraryCacheManager, archive, accumulatorManager, executionRetries, - delayBetweenRetries, timeout, streamingMode) with TestingJobManager) + val jobManagerProps = Props( + new JobManager( + configuration, + executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archive, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + streamingMode) + with TestingJobManager) actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 89e1d72..5747b7e 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -106,11 +106,10 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { case NotifyWhenJobRemoved(jobID) => - val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager) + val gateways = instanceManager.getAllRegisteredInstances.map(_.getInstanceGateway) - val responses = tms.map{ - tm => - (tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean] + val responses = gateways.map{ + gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean] } import context.dispatcher @@ -135,17 +134,17 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { currentJobs.get(jobID) match { case Some((eg, _)) => if(eg.getAllExecutionVertices.isEmpty){ - sender ! WorkingTaskManager(ActorRef.noSender) + sender ! WorkingTaskManager(None) } else { val resource = eg.getAllExecutionVertices.head.getCurrentAssignedResource if(resource == null){ - sender ! WorkingTaskManager(ActorRef.noSender) + sender ! WorkingTaskManager(None) } else { - sender ! WorkingTaskManager(resource.getInstance().getTaskManager) + sender ! WorkingTaskManager(Some(resource.getInstance().getInstanceGateway)) } } - case None => sender ! WorkingTaskManager(ActorRef.noSender) + case None => sender ! WorkingTaskManager(None) } case NotifyWhenJobStatus(jobID, state) => http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index f810749..241c6c0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.runtime.executiongraph.ExecutionGraph +import org.apache.flink.runtime.instance.InstanceGateway import org.apache.flink.runtime.jobgraph.JobStatus object TestingJobManagerMessages { @@ -43,7 +44,7 @@ object TestingJobManagerMessages { case class NotifyWhenJobRemoved(jobID: JobID) case class RequestWorkingTaskManager(jobID: JobID) - case class WorkingTaskManager(taskManager: ActorRef) + case class WorkingTaskManager(gatewayOption: Option[InstanceGateway]) case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus) case class JobStatusIs(jobID: JobID, state: JobStatus) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 3611633..914f37c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -18,14 +18,12 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.{ActorRef, ActorSystem} -import akka.testkit.CallingThreadDispatcher +import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue import scala.concurrent.duration._ import scala.concurrent.ExecutionContext @@ -67,19 +65,33 @@ object TestingUtils { new TestingCluster(config) } - def setGlobalExecutionContext(): Unit = { - AkkaUtils.globalExecutionContext = ExecutionContext.global + /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]] + * with a default parallelism equal to the number of available cores. + * + * @return ExecutionContext.global + */ + def defaultExecutionContext = ExecutionContext.global + + /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable. + * + * @return Direct [[ExecutionContext]] which executes runnables directly + */ + def directExecutionContext = ExecutionContext.fromExecutor(MoreExecutors.directExecutor()) + + /** @return A new [[QueuedActionExecutionContext]] */ + def queuedActionExecutionContext = { + new QueuedActionExecutionContext(new ActionQueue()) } - def setCallingThreadDispatcher(system: ActorSystem): Unit = { - AkkaUtils.globalExecutionContext = system.dispatchers.lookup(CallingThreadDispatcher.Id) - } - - def setExecutionContext(context: ExecutionContext): Unit = { - AkkaUtils.globalExecutionContext = context - } + /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] instead of + * execution them. If the automatic execution mode is activated, then the [[Runnable]] are + * executed. + * + * @param actionQueue + */ + class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue) + extends ExecutionContext { - class QueuedActionExecutionContext(queue: ActionQueue) extends ExecutionContext { var automaticExecution = false def toggleAutomaticExecution() = { @@ -90,7 +102,7 @@ object TestingUtils { if(automaticExecution){ runnable.run() }else { - queue.queueAction(runnable) + actionQueue.queueAction(runnable) } } @@ -98,4 +110,26 @@ object TestingUtils { t.printStackTrace() } } + + /** Queue which stores [[Runnable]] */ + class ActionQueue { + private val runnables = scala.collection.mutable.Queue[Runnable]() + + def triggerNextAction { + val r = runnables.dequeue + r.run() + } + + def popNextAction: Runnable = { + runnables.dequeue() + } + + def queueAction(r: Runnable) { + runnables.enqueue(r) + } + + def isEmpty: Boolean = { + runnables.isEmpty + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index a0db4d6..e108970 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -30,8 +30,8 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StreamingMode; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.hadoop.fs.FileSystem; import org.junit.Assert; import org.slf4j.Logger; @@ -148,7 +148,7 @@ public class TestBaseUtils { } Future<Iterable<Object>> bcVariableManagerFutureResponses = Futures.sequence( - bcVariableManagerResponseFutures, AkkaUtils.globalExecutionContext()); + bcVariableManagerResponseFutures, TestingUtils.defaultExecutionContext()); Iterable<Object> responses = Await.result(bcVariableManagerFutureResponses, timeout); @@ -158,7 +158,7 @@ public class TestBaseUtils { } Future<Iterable<Object>> numActiveConnectionsFutureResponses = Futures.sequence( - numActiveConnectionsResponseFutures, AkkaUtils.globalExecutionContext()); + numActiveConnectionsResponseFutures, TestingUtils.defaultExecutionContext()); responses = Await.result(numActiveConnectionsFutureResponses, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 796ea09..0534178 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -80,16 +80,38 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, override def startJobManager(actorSystem: ActorSystem): ActorRef = { - val (instanceManager, scheduler, libraryCacheManager, _, accumulatorManager, - executionRetries, delayBetweenRetries, - timeout, archiveCount) = JobManager.createJobManagerComponents(configuration) + val (executionContext, + instanceManager, + scheduler, + libraryCacheManager, + _, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount) = JobManager.createJobManagerComponents(configuration) + + val testArchiveProps = Props( + new MemoryArchivist( + archiveCount) + with TestingMemoryArchivist) - val testArchiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist) val archive = actorSystem.actorOf(testArchiveProps, JobManager.ARCHIVE_NAME) - val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler, - libraryCacheManager, archive, accumulatorManager, executionRetries, - delayBetweenRetries, timeout, streamingMode) with TestingJobManager) + val jobManagerProps = Props( + new JobManager( + configuration, + executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archive, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + streamingMode) + with TestingJobManager) val jobManager = actorSystem.actorOf(jobManagerProps, JobManager.JOB_MANAGER_NAME) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index 1884fab..c20f621 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -110,9 +110,15 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { jm ! RequestWorkingTaskManager(jobID) - val tm = expectMsgType[WorkingTaskManager].taskManager - // kill one task manager - tm ! PoisonPill + val gatewayOption = expectMsgType[WorkingTaskManager].gatewayOption + + gatewayOption match { + case Some(gateway) => + // kill one task manager + gateway.tell(PoisonPill) + + case None => fail("Could not retrieve a working task manager.") + } val failure = expectMsgType[Failure] http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 5dd197d..8cfeead 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -235,16 +235,34 @@ object ApplicationMaster { // start all the components inside the job manager LOG.debug("Starting JobManager components") - val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, - executionRetries, delayBetweenRetries, - timeout, _) = JobManager.createJobManagerComponents(configuration) + val (executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archiveProps, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + _) = JobManager.createJobManagerComponents(configuration) // start the archiver val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME) - val jobManagerProps = Props(new JobManager(configuration, instanceManager, scheduler, - libraryCacheManager, archiver, accumulatorManager, executionRetries, - delayBetweenRetries, timeout, streamingMode) with ApplicationMasterActor) + val jobManagerProps = Props( + new JobManager( + configuration, + executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archiver, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + streamingMode) + with ApplicationMasterActor) LOG.debug("Starting JobManager actor") val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 411808b..3fb5e30 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -103,7 +103,7 @@ trait ApplicationMasterActor extends ActorLogMessages { instanceManager.getAllRegisteredInstances.asScala foreach { instance => - instance.getTaskManager ! StopYarnSession(status, diag) + instance.getInstanceGateway.tell(StopYarnSession(status, diag)) } rmClientOption foreach {
