azagrebin commented on a change in pull request #8226: [FLINK-12181][Tests] 
Port ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226#discussion_r279341488
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##########
 @@ -733,44 +815,67 @@ public void testFailureWhileRestarting() throws 
Exception {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private Scheduler createSchedulerWithInstances(int num, 
TaskManagerGateway taskManagerGateway) {
-               final Scheduler scheduler = new Scheduler(executor);
-               final Instance[] instances = new Instance[num];
+       private Scheduler createSchedulerWithSlots(int num, TaskManagerGateway 
taskManagerGateway, SlotPool slotPool) throws Exception {
+               setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+               Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+               
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-               for (int i = 0; i < instances.length; i++) {
-                       instances[i] = createInstance(taskManagerGateway, 55443 
+ i);
-                       scheduler.newInstanceAvailable(instances[i]);
+               final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS);
+               for (int i = 0; i < num; i++) {
+                       final AllocationID allocationId = new AllocationID();
+                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+                       slotOffers.add(slotOffer);
                }
 
+               slotPool.offerSlots(taskManagerLocation, taskManagerGateway, 
slotOffers);
+
                return scheduler;
        }
 
-       private static Instance createInstance(TaskManagerGateway 
taskManagerGateway, int port) {
-               final HardwareDescription resources = new 
HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
-               final TaskManagerLocation location = new TaskManagerLocation(
-                       ResourceID.generate(), 
InetAddress.getLoopbackAddress(), port);
-               return new Instance(taskManagerGateway, location, new 
InstanceID(), resources, 1);
+       // 
------------------------------------------------------------------------
+
+       private static void setupSlotPool(
+               SlotPool slotPool,
+               ResourceManagerGateway resourceManagerGateway,
+               ComponentMainThreadExecutor mainThreadExecutable) throws 
Exception {
+               final String jobManagerAddress = "foobar";
+
+               slotPool.start(JobMasterId.generate(), jobManagerAddress, 
mainThreadExecutable);
+
+               slotPool.connectToResourceManager(resourceManagerGateway);
        }
 
-       // 
------------------------------------------------------------------------
+       private static Scheduler setupScheduler(
+               SlotPool slotPool,
+               ComponentMainThreadExecutor mainThreadExecutable) {
+               Scheduler scheduler = new 
SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool);
+               scheduler.start(mainThreadExecutable);
+               return scheduler;
+       }
 
-       private Tuple2<ExecutionGraph, Instance> 
createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
-               Instance instance = ExecutionGraphTestUtils.getInstance(
-                       new SimpleAckingTaskManagerGateway(),
-                       NUM_TASKS);
+       private ExecutionGraph createExecutionGraph(RestartStrategy 
restartStrategy, SlotPool slotPool) throws Exception {
+               setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
 
 Review comment:
   The beginning of `createExecutionGraph` looks like the same as just calling 
`createSchedulerWithSlots(NUM_TASKS, taskManagerGateway, slotPool)`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to