http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 579ca3a..6a0bd87 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -47,13 +47,13 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; @@ -102,7 +102,6 @@ public class TaskExecutorITCase extends TestLogger { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.milliseconds(500L), Time.milliseconds(500L)); - SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( testingHAServices, rpcService.getScheduledExecutor(), @@ -121,6 +120,11 @@ public class TaskExecutorITCase extends TestLogger { final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L)); final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); + final SlotManager slotManager = new SlotManager( + rpcService.getScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager( rpcService, @@ -129,7 +133,7 @@ public class TaskExecutorITCase extends TestLogger { resourceManagerConfiguration, testingHAServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, testingFatalErrorHandler); @@ -168,7 +172,7 @@ public class TaskExecutorITCase extends TestLogger { rpcService.registerGateway(jmAddress, jmGateway); final AllocationID allocationId = new AllocationID(); - final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress); final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile); try {
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 1d1840e..d3d4d43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -65,10 +65,8 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -101,6 +99,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.*; public class TaskExecutorTest extends TestLogger { @@ -703,10 +702,7 @@ public class TaskExecutorTest extends TestLogger { resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId); // request slots from the task manager under the given allocation id - TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId); - - // this is hopefully successful :-) - assertTrue(reply instanceof TMSlotRequestRegistered); + taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId); // now inform the task manager about the new job leader jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); @@ -820,7 +816,11 @@ public class TaskExecutorTest extends TestLogger { // been properly started. jobLeaderService.addJob(jobId, jobManagerAddress); - verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1))); + verify(resourceManagerGateway).notifySlotAvailable( + eq(resourceManagerLeaderId), + eq(registrationId), + eq(new SlotID(resourceId, 1)), + eq(allocationId2)); assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); @@ -903,15 +903,19 @@ public class TaskExecutorTest extends TestLogger { // test that allocating a slot works final SlotID slotID = new SlotID(resourceID, 0); - TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId); - assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered); + taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId); // TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID... // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1); - TMSlotRequestReply tmSlotRequestReply2 = + + try { taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); - assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected); + + fail("The slot request should have failed."); + } catch (SlotAllocationException e) { + // expected + } // re-register verify(rmGateway1).registerTaskExecutor( @@ -920,9 +924,7 @@ public class TaskExecutorTest extends TestLogger { // now we should be successful because the slots status has been synced // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM - TMSlotRequestReply tmSlotRequestReply3 = - taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); - assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered); + taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); // check if a concurrent error occurred testingFatalErrorHandler.rethrowError(); @@ -1100,7 +1102,11 @@ public class TaskExecutorTest extends TestLogger { // acknowledge the offered slots offerResultFuture.complete(Collections.singleton(offer1)); - verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1))); + verify(resourceManagerGateway).notifySlotAvailable( + eq(resourceManagerLeaderId), + eq(registrationId), + eq(new SlotID(resourceId, 1)), + any(AllocationID.class)); assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index f58faf2..03b5172 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -28,11 +28,13 @@ import com.google.common.util.concurrent.MoreExecutors import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.time.Time import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter} import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph @@ -78,6 +80,10 @@ object TestingUtils { } def getDefaultTestingActorSystemConfig = testConfig + + def infiniteTime: Time = { + Time.milliseconds(Long.MaxValue); + } def startTestingCluster(numSlots: Int, numTMs: Int = 1, @@ -114,6 +120,12 @@ object TestingUtils { } } + def defaultScheduledExecutor: ScheduledExecutor = { + val scheduledExecutorService = defaultExecutor + + new ScheduledExecutorServiceAdapter(scheduledExecutorService) + } + /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable. * * @return Direct [[ExecutionContext]] which executes runnables directly http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index 65d12b5..a6b66d7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -208,7 +208,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati resourceManagerConfiguration, haServices, heartbeatServices, - resourceManagerRuntimeServices.getSlotManagerFactory(), + resourceManagerRuntimeServices.getSlotManager(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), this); http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 74359c8..63e6a4c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -28,12 +28,13 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -115,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - SlotManagerFactory slotManagerFactory, + SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { @@ -126,7 +127,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler); @@ -223,6 +224,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements } @Override + public void stopWorker(InstanceID instanceId) { + // TODO: Implement to stop the worker + } + + @Override protected ResourceID workerStarted(ResourceID resourceID) { return resourceID; }