http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 579ca3a..6a0bd87 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,13 +47,13 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.Matchers;
@@ -102,7 +102,6 @@ public class TaskExecutorITCase extends TestLogger {
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.milliseconds(500L),
                        Time.milliseconds(500L));
-               SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        testingHAServices,
                        rpcService.getScheduledExecutor(),
@@ -121,6 +120,11 @@ public class TaskExecutorITCase extends TestLogger {
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(resourceProfile), new 
TimerService<AllocationID>(scheduledExecutorService, 100L));
                final JobManagerTable jobManagerTable = new JobManagerTable();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
+               final SlotManager slotManager = new SlotManager(
+                       rpcService.getScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
 
                ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
                        rpcService,
@@ -129,7 +133,7 @@ public class TaskExecutorITCase extends TestLogger {
                        resourceManagerConfiguration,
                        testingHAServices,
                        heartbeatServices,
-                       slotManagerFactory,
+                       slotManager,
                        metricRegistry,
                        jobLeaderIdService,
                        testingFatalErrorHandler);
@@ -168,7 +172,7 @@ public class TaskExecutorITCase extends TestLogger {
                rpcService.registerGateway(jmAddress, jmGateway);
 
                final AllocationID allocationId = new AllocationID();
-               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, jmAddress);
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
resourceProfile);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1d1840e..d3d4d43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,10 +65,8 @@ import 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -101,6 +99,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
@@ -703,10 +702,7 @@ public class TaskExecutorTest extends TestLogger {
                        
resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId);
 
                        // request slots from the task manager under the given 
allocation id
-                       TMSlotRequestReply reply = 
taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, 
resourceManagerLeaderId);
-
-                       // this is hopefully successful :-)
-                       assertTrue(reply instanceof TMSlotRequestRegistered);
+                       taskManager.requestSlot(slotId, jobId, allocationId, 
jobManagerAddress, resourceManagerLeaderId);
 
                        // now inform the task manager about the new job leader
                        
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
@@ -820,7 +816,11 @@ public class TaskExecutorTest extends TestLogger {
                        // been properly started.
                        jobLeaderService.addJob(jobId, jobManagerAddress);
 
-                       
verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), 
eq(registrationId), eq(new SlotID(resourceId, 1)));
+                       verify(resourceManagerGateway).notifySlotAvailable(
+                               eq(resourceManagerLeaderId),
+                               eq(registrationId),
+                               eq(new SlotID(resourceId, 1)),
+                               eq(allocationId2));
 
                        assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
                        assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
@@ -903,15 +903,19 @@ public class TaskExecutorTest extends TestLogger {
 
                        // test that allocating a slot works
                        final SlotID slotID = new SlotID(resourceID, 0);
-                       TMSlotRequestReply tmSlotRequestReply = 
taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, 
leaderId);
-                       assertTrue(tmSlotRequestReply instanceof 
TMSlotRequestRegistered);
+                       taskManager.requestSlot(slotID, jobId, new 
AllocationID(), jobManagerAddress, leaderId);
 
                        // TODO: Figure out the concrete allocation behaviour 
between RM and TM. Maybe we don't need the SlotID...
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
                        final SlotID unconfirmedFreeSlotID = new 
SlotID(resourceID, 1);
-                       TMSlotRequestReply tmSlotRequestReply2 =
+
+                       try {
                                taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
-                       assertTrue(tmSlotRequestReply2 instanceof 
TMSlotRequestRejected);
+
+                               fail("The slot request should have failed.");
+                       } catch (SlotAllocationException e) {
+                               // expected
+                       }
 
                        // re-register
                        verify(rmGateway1).registerTaskExecutor(
@@ -920,9 +924,7 @@ public class TaskExecutorTest extends TestLogger {
 
                        // now we should be successful because the slots status 
has been synced
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
-                       TMSlotRequestReply tmSlotRequestReply3 =
-                               taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
-                       assertTrue(tmSlotRequestReply3 instanceof 
TMSlotRequestRegistered);
+                       taskManager.requestSlot(unconfirmedFreeSlotID, jobId, 
new AllocationID(), jobManagerAddress, leaderId);
 
                        // check if a concurrent error occurred
                        testingFatalErrorHandler.rethrowError();
@@ -1100,7 +1102,11 @@ public class TaskExecutorTest extends TestLogger {
                        // acknowledge the offered slots
                        
offerResultFuture.complete(Collections.singleton(offer1));
 
-                       
verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), 
eq(registrationId), eq(new SlotID(resourceId, 1)));
+                       verify(resourceManagerGateway).notifySlotAvailable(
+                               eq(resourceManagerLeaderId),
+                               eq(registrationId),
+                               eq(new SlotID(resourceId, 1)),
+                               any(AllocationID.class));
 
                        assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
                        assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index f58faf2..03b5172 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,11 +28,13 @@ import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
HighAvailabilityOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.{ScheduledExecutor, 
ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -78,6 +80,10 @@ object TestingUtils {
   }
 
   def getDefaultTestingActorSystemConfig = testConfig
+
+  def infiniteTime: Time = {
+    Time.milliseconds(Long.MaxValue);
+  }
   
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
@@ -114,6 +120,12 @@ object TestingUtils {
     }
   }
 
+  def defaultScheduledExecutor: ScheduledExecutor = {
+    val scheduledExecutorService = defaultExecutor
+
+    new ScheduledExecutorServiceAdapter(scheduledExecutorService)
+  }
+
   /** Returns an [[ExecutionContext]] which uses the current thread to execute 
the runnable.
     *
     * @return Direct [[ExecutionContext]] which executes runnables directly

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 65d12b5..a6b66d7 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -208,7 +208,7 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                        resourceManagerConfiguration,
                        haServices,
                        heartbeatServices,
-                       resourceManagerRuntimeServices.getSlotManagerFactory(),
+                       resourceManagerRuntimeServices.getSlotManager(),
                        metricRegistry,
                        resourceManagerRuntimeServices.getJobLeaderIdService(),
                        this);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 74359c8..63e6a4c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,12 +28,13 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -115,7 +116,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        ResourceManagerConfiguration 
resourceManagerConfiguration,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       SlotManagerFactory slotManagerFactory,
+                       SlotManager slotManager,
                        MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
@@ -126,7 +127,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
-                       slotManagerFactory,
+                       slotManager,
                        metricRegistry,
                        jobLeaderIdService,
                        fatalErrorHandler);
@@ -223,6 +224,11 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        }
 
        @Override
+       public void stopWorker(InstanceID instanceId) {
+               // TODO: Implement to stop the worker
+       }
+
+       @Override
        protected ResourceID workerStarted(ResourceID resourceID) {
                return resourceID;
        }

Reply via email to