Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77524466
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +   private static TestingRpcService testRpcService;
    +
    +   @BeforeClass
    +   public static void beforeClass() {
    +           testRpcService = new TestingRpcService();
    +
    +   }
    +
    +   @AfterClass
    +   public static void afterClass() {
    +           testRpcService.stopService();
    +           testRpcService = null;
    +   }
    +
    +   @Before
    +   public void beforeTest(){
    +           testRpcService.clearGateways();
    +   }
    +
    +   /**
    +    * Tests whether
    +    * 1) SlotRequest is routed to the SlotManager
    +    * 2) SlotRequest leads to a container allocation
    +    * 3) SlotRequest is confirmed
    +    * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +    */
    +   @Test
    +   public void testSlotsUnavailableRequest() throws Exception {
    +           final String rmAddress = "/rm1";
    +           final String jmAddress = "/jm1";
    +           final JobID jobID = new JobID();
    +
    +           testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
    +
    +
    +           TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
    +           ResourceManager resourceManager =
    +                   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
    +           resourceManager.start();
    +
    +           Future<RegistrationResponse> registrationFuture =
    +                   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
    +           try {
    +                   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
    +           } catch (Exception e) {
    +                   Assert.fail("JobManager registration Future didn't 
become ready.");
    +           }
    +
    +           final AllocationID allocationID = new AllocationID();
    +           final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
    +
    +           SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
    +           Future<SlotRequestRegistered> slotRequestFuture =
    +                   resourceManager.getSelf().requestSlot(slotRequest);
    +
    +           // 1) SlotRequest is routed to the SlotManager
    +           verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +           // 2) SlotRequest leads to a container allocation
    +           verify(slotManager, 
timeout(5000)).allocateContainer(resourceProfile);
    +
    +           // 3) SlotRequest is confirmed
    +           Assert.assertEquals(
    +                   Await.result(slotRequestFuture, Duration.create(5, 
TimeUnit.SECONDS)).getAllocationID(),
    +                   allocationID);
    +
    +           Assert.assertFalse(slotManager.isAllocated(allocationID));
    +
    +           // slot becomes available
    +           final String tmAddress = "/tm1";
    +           TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
    +           testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +           final ResourceID resourceID = ResourceID.generate();
    +           final SlotID slotID = new SlotID(resourceID, 0);
    +
    +           final SlotStatus slotStatus =
    +                   new SlotStatus(slotID, resourceProfile, 
taskExecutorGateway);
    +           final SlotReport slotReport =
    +                   new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
    +           // register slot at SlotManager
    +           slotManager.updateSlotStatus(slotReport);
    +
    +           // 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +           verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    +   }
    +
    +   /**
    +    * Tests whether
    +    * 1) a SlotRequest is routed to the SlotManager
    +    * 2) a SlotRequest leads to an allocation of a registered slot
    +    * 3) a SlotRequest is confirmed
    +    * 4) a SlotRequest is routed to the TaskExecutor
    +    */
    +   @Test
    +   public void testSlotAvailableRequest() throws Exception {
    +           final String rmAddress = "/rm1";
    +           final String jmAddress = "/jm1";
    +           final String tmAddress = "/tm1";
    +           final JobID jobID = new JobID();
    +
    +           testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
    +
    +           TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
    +           testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +           TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
    +           ResourceManager resourceManager =
    +                   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
    +           resourceManager.start();
    +
    +           Future<RegistrationResponse> registrationFuture =
    +                   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
    +           try {
    +                   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
    +           } catch (Exception e) {
    +                   Assert.fail("JobManager registration Future didn't 
become ready.");
    +           }
    +
    +           final ResourceID resourceID = ResourceID.generate();
    +           final AllocationID allocationID = new AllocationID();
    +           final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
    +           final SlotID slotID = new SlotID(resourceID, 0);
    +
    +           final SlotStatus slotStatus =
    +                   new SlotStatus(slotID, resourceProfile, 
taskExecutorGateway);
    +           final SlotReport slotReport =
    +                   new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
    +           // register slot at SlotManager
    +           slotManager.updateSlotStatus(slotReport);
    +
    +           SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
    +           Future<SlotRequestRegistered> slotRequestFuture =
    +                   resourceManager.getSelf().requestSlot(slotRequest);
    +
    +           // 1) a SlotRequest is routed to the SlotManager
    +           verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +           // 2) a SlotRequest leads to an allocation of a registered slot
    +           Assert.assertTrue(slotManager.isAllocated(slotID));
    +           Assert.assertTrue(slotManager.isAllocated(allocationID));
    +
    +           // 3) a SlotRequest is confirmed
    +           Assert.assertEquals(
    +                   Await.result(slotRequestFuture, Duration.create(5, 
TimeUnit.SECONDS)).getAllocationID(),
    +                   allocationID);
    +
    +           // 4) a SlotRequest is routed to the TaskExecutor
    +           verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    --- End diff --
    
    I think the test should also work if you use the `TestingSerialRpcService` 
which has the advantage to get rid of waiting. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to