Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5318#discussion_r162867968
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
    @@ -1425,4 +1440,137 @@ public void 
testFilterOutDuplicateJobMasterRegistrations() throws Exception {
                        
taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                }
        }
    +
    +   /**
    +    * Tests that the heartbeat is stopped once the TaskExecutor detects 
that the RM is no longer leader.
    +    *
    +    * <p>See FLINK-8462
    +    */
    +   @Test
    +   public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception 
{
    +           final long heartbeatInterval = 1L;
    +           final long heartbeatTimeout = 1000L;
    +           final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(new Configuration());
    +           final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
    +           final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
    +           final RecordingHeartbeatServices heartbeatServices = new 
RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
    +           final ResourceID rmResourceID = ResourceID.generate();
    +
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
    +
    +           final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    +           final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
    +           
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +           final String rmAddress = "rm";
    +           final TestingResourceManagerGateway rmGateway = new 
TestingResourceManagerGateway(
    +                   ResourceManagerId.generate(),
    +                   rmResourceID,
    +                   heartbeatInterval,
    +                   rmAddress,
    +                   rmAddress);
    +
    +           final CompletableFuture<ResourceID> registeredTaskManagerFuture 
= new CompletableFuture<>();
    +
    +           rmGateway.setRegisterTaskExecutorFunction(
    +                   info -> {
    +                           registeredTaskManagerFuture.complete(info.f1);
    +                           return CompletableFuture.completedFuture(
    +                                   new TaskExecutorRegistrationSuccess(
    +                                           new InstanceID(),
    +                                           rmResourceID,
    +                                           heartbeatInterval));
    +                   });
    +
    +           rpc.registerGateway(rmAddress, rmGateway);
    +
    +           final TaskExecutor taskExecutor = new TaskExecutor(
    +                   rpc,
    +                   taskManagerConfiguration,
    +                   taskManagerLocation,
    +                   mock(MemoryManager.class),
    +                   mock(IOManager.class),
    +                   mock(NetworkEnvironment.class),
    +                   haServices,
    +                   heartbeatServices,
    +                   mock(TaskManagerMetricGroup.class),
    +                   mock(BroadcastVariableManager.class),
    +                   mock(FileCache.class),
    +                   taskSlotTable,
    +                   mock(JobManagerTable.class),
    +                   mock(JobLeaderService.class),
    +                   testingFatalErrorHandler);
    +
    +           try {
    +                   taskExecutor.start();
    +
    +                   rmLeaderRetrievalService.notifyListener(rmAddress, 
rmGateway.getFencingToken().toUUID());
    +
    +                   // wait for TM registration
    +                   assertThat(
    +                           
registeredTaskManagerFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS),
    +                           
org.hamcrest.Matchers.equalTo(taskManagerLocation.getResourceID()));
    +
    +                   final BlockingQueue<ResourceID> unmonitoredTargets = 
heartbeatServices.getUnmonitoredTargets();
    +
    +                   // let RM lose leadership
    +                   rmLeaderRetrievalService.notifyListener(null, null);
    +
    +                   // the timeout should not have triggered since it is 
much higher
    +                   assertThat(unmonitoredTargets.poll(100L, 
TimeUnit.MILLISECONDS), org.hamcrest.Matchers.equalTo(rmResourceID));
    --- End diff --
    
    I'd add a static import for `equalTo`, or simply use `assertEquals` since 
the other tests already do so.


---

Reply via email to