azagrebin commented on a change in pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226#discussion_r279341488
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ########## @@ -733,44 +815,67 @@ public void testFailureWhileRestarting() throws Exception { // Utilities // ------------------------------------------------------------------------ - private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) { - final Scheduler scheduler = new Scheduler(executor); - final Instance[] instances = new Instance[num]; + private Scheduler createSchedulerWithSlots(int num, TaskManagerGateway taskManagerGateway, SlotPool slotPool) throws Exception { + setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); + Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - for (int i = 0; i < instances.length; i++) { - instances[i] = createInstance(taskManagerGateway, 55443 + i); - scheduler.newInstanceAvailable(instances[i]); + final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS); + for (int i = 0; i < num; i++) { + final AllocationID allocationId = new AllocationID(); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + slotOffers.add(slotOffer); } + slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers); + return scheduler; } - private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) { - final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000); - final TaskManagerLocation location = new TaskManagerLocation( - ResourceID.generate(), InetAddress.getLoopbackAddress(), port); - return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1); + // ------------------------------------------------------------------------ + + private static void setupSlotPool( + SlotPool slotPool, + ResourceManagerGateway resourceManagerGateway, + ComponentMainThreadExecutor mainThreadExecutable) throws Exception { + final String jobManagerAddress = "foobar"; + + slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutable); + + slotPool.connectToResourceManager(resourceManagerGateway); } - // ------------------------------------------------------------------------ + private static Scheduler setupScheduler( + SlotPool slotPool, + ComponentMainThreadExecutor mainThreadExecutable) { + Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool); + scheduler.start(mainThreadExecutable); + return scheduler; + } - private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception { - Instance instance = ExecutionGraphTestUtils.getInstance( - new SimpleAckingTaskManagerGateway(), - NUM_TASKS); + private ExecutionGraph createExecutionGraph(RestartStrategy restartStrategy, SlotPool slotPool) throws Exception { + setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); Review comment: The beginning of `createExecutionGraph` looks like the same as just calling `createSchedulerWithSlots(NUM_TASKS, taskManagerGateway, slotPool)`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services