GJL commented on a change in pull request #11552: [FLINK-16836] Clear
rpcConnection field in JobManagerLeaderListener when target loses leadership
URL: https://github.com/apache/flink/pull/11552#discussion_r400719813
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java
##########
@@ -96,21 +100,77 @@ public void go() throws Exception {
addJobAction.sync();
}
+ /**
+ * Tests that the JobLeaderService won't try to reconnect to JobMaster
after it
+ * has lost the leadership. See FLINK-16836.
+ */
+ @Test
+ public void doesNotReconnectAfterTargetLostLeadership() throws
Exception {
+ final JobLeaderService jobLeaderService = new JobLeaderService(
+ new LocalUnresolvedTaskManagerLocation(),
+
RetryingRegistrationConfiguration.defaultConfiguration());
+
+ final JobID jobId = new JobID();
+
+ final SettableLeaderRetrievalService leaderRetrievalService =
new SettableLeaderRetrievalService();
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServicesBuilder()
+ .setJobMasterLeaderRetrieverFunction(ignored ->
leaderRetrievalService)
+ .build();
+
+ final String jmAddress = "foobar";
+ final TestingJobMasterGateway jobMasterGateway = new
TestingJobMasterGatewayBuilder().build();
+
rpcServiceResource.getTestingRpcService().registerGateway(jmAddress,
jobMasterGateway);
+
+ final TestingJobLeaderListener testingJobLeaderListener = new
TestingJobLeaderListener();
+ jobLeaderService.start(
+ "foobar",
+ rpcServiceResource.getTestingRpcService(),
+ haServices,
+ testingJobLeaderListener);
+
+ try {
+ jobLeaderService.addJob(jobId, jmAddress);
+
+ leaderRetrievalService.notifyListener(jmAddress,
UUID.randomUUID());
+
+
testingJobLeaderListener.waitUntilJobManagerGainedLeadership();
+
+ // revoke the leadership
+ leaderRetrievalService.notifyListener(null, null);
+
testingJobLeaderListener.waitUntilJobManagerLostLeadership();
+
+ jobLeaderService.reconnect(jobId);
+ } finally {
+ jobLeaderService.stop();
+ }
+ }
+
private static final class TestingJobLeaderListener implements
JobLeaderListener {
+ private final CountDownLatch jobManagerGainedLeadership = new
CountDownLatch(1);
+ private final CountDownLatch jobManagerLostLeadershp = new
CountDownLatch(1);
+
@Override
public void jobManagerGainedLeadership(JobID jobId,
JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess
registrationMessage) {
- // ignored
+ jobManagerGainedLeadership.countDown();
}
@Override
public void jobManagerLostLeadership(JobID jobId, JobMasterId
jobMasterId) {
- // ignored
+ jobManagerLostLeadershp.countDown();
Review comment:
```suggestion
jobManagerLostLeadership.countDown();
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services