Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5318#discussion_r162867968 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java --- @@ -1425,4 +1440,137 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception { taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } } + + /** + * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader. + * + * <p>See FLINK-8462 + */ + @Test + public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception { + final long heartbeatInterval = 1L; + final long heartbeatTimeout = 1000L; + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration()); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout); + final ResourceID rmResourceID = ResourceID.generate(); + + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); + haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + + final String rmAddress = "rm"; + final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway( + ResourceManagerId.generate(), + rmResourceID, + heartbeatInterval, + rmAddress, + rmAddress); + + final CompletableFuture<ResourceID> registeredTaskManagerFuture = new CompletableFuture<>(); + + rmGateway.setRegisterTaskExecutorFunction( + info -> { + registeredTaskManagerFuture.complete(info.f1); + return CompletableFuture.completedFuture( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + rmResourceID, + heartbeatInterval)); + }); + + rpc.registerGateway(rmAddress, rmGateway); + + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + taskManagerConfiguration, + taskManagerLocation, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + heartbeatServices, + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + mock(JobManagerTable.class), + mock(JobLeaderService.class), + testingFatalErrorHandler); + + try { + taskExecutor.start(); + + rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID()); + + // wait for TM registration + assertThat( + registeredTaskManagerFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), + org.hamcrest.Matchers.equalTo(taskManagerLocation.getResourceID())); + + final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets(); + + // let RM lose leadership + rmLeaderRetrievalService.notifyListener(null, null); + + // the timeout should not have triggered since it is much higher + assertThat(unmonitoredTargets.poll(100L, TimeUnit.MILLISECONDS), org.hamcrest.Matchers.equalTo(rmResourceID)); --- End diff -- I'd add a static import for `equalTo`, or simply use `assertEquals` since the other tests already do so.
---