Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6067#discussion_r190602158
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
    @@ -1483,6 +1485,216 @@ public void 
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
                }
        }
     
    +   /**
    +    * Tests that we ignore slot requests if the TaskExecutor is not
    +    * registered at a ResourceManager.
    +    */
    +   @Test
    +   public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +           final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
    +
    +           final TaskExecutor taskExecutor = 
createTaskExecutor(taskManagerServices);
    +
    +           taskExecutor.start();
    +
    +           try {
    +                   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
    +
    +                   final CompletableFuture<RegistrationResponse> 
registrationFuture = new CompletableFuture<>();
    +                   final CompletableFuture<ResourceID> 
taskExecutorResourceIdFuture = new CompletableFuture<>();
    +
    +                   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
    +                
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationFuture;
    +            });
    +
    +                   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
    +                   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +                   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +                   final ResourceID resourceId = 
taskExecutorResourceIdFuture.get();
    +
    +                   final SlotID slotId = new SlotID(resourceId, 0);
    +                   final CompletableFuture<Acknowledge> 
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new 
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(), 
timeout);
    +
    +                   try {
    +                           slotRequestResponse.get();
    +                           fail("We should not be able to request slots 
before the TaskExecutor is registered at the ResourceManager.");
    +                   } catch (ExecutionException ee) {
    +                           
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(TaskManagerException.class));
    +                   }
    +           } finally {
    +                   RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
    +           }
    +   }
    +
    +   /**
    +    * Tests that the TaskExecutor tries to reconnect to a ResourceManager 
from which it
    +    * was explicitly disconnected.
    +    */
    +   @Test
    +   public void testReconnectionAttemptIfExplicitlyDisconnected() throws 
Exception {
    +           final long heartbeatInterval = 1000L;
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +           final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
    +           final TaskExecutor taskExecutor = new TaskExecutor(
    +                   rpc,
    +                   
TaskManagerConfiguration.fromConfiguration(configuration),
    +                   haServices,
    +                   new TaskManagerServicesBuilder()
    +                           .setTaskSlotTable(taskSlotTable)
    +                           .setTaskManagerLocation(taskManagerLocation)
    +                           .build(),
    +                   new HeartbeatServices(heartbeatInterval, 1000L),
    +                   
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
    +                   dummyBlobCacheService,
    +                   testingFatalErrorHandler);
    +
    +           taskExecutor.start();
    +
    +           try {
    +                   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
    +                   final ClusterInformation clusterInformation = new 
ClusterInformation("foobar", 1234);
    +                   final CompletableFuture<RegistrationResponse> 
registrationResponseFuture = CompletableFuture.completedFuture(new 
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 
heartbeatInterval, clusterInformation));
    +                   final BlockingQueue<ResourceID> registrationQueue = new 
ArrayBlockingQueue<>(1);
    +
    +                   
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
 -> {
    +                
registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
    +                return registrationResponseFuture;
    +            });
    +                   
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
    +
    +                   
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
    +
    +                   final ResourceID firstRegistrationAttempt = 
registrationQueue.take();
    +
    +                   assertThat(firstRegistrationAttempt, 
equalTo(taskManagerLocation.getResourceID()));
    +
    +                   final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    +
    +                   assertThat(registrationQueue.isEmpty(), is(true));
    --- End diff --
    
    nit: `assertThat(registrationQueue, is(empty()));` gives better failure 
messages. Now it's just a fancy way of writing `assertTrue()`.


---

Reply via email to