GJL commented on a change in pull request #11552: [FLINK-16836] Clear 
rpcConnection field in JobManagerLeaderListener when target loses leadership
URL: https://github.com/apache/flink/pull/11552#discussion_r400719913
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java
 ##########
 @@ -96,21 +100,77 @@ public void go() throws Exception {
                addJobAction.sync();
        }
 
+       /**
+        * Tests that the JobLeaderService won't try to reconnect to JobMaster 
after it
+        * has lost the leadership. See FLINK-16836.
+        */
+       @Test
+       public void doesNotReconnectAfterTargetLostLeadership() throws 
Exception {
+               final JobLeaderService jobLeaderService = new JobLeaderService(
+                       new LocalUnresolvedTaskManagerLocation(),
+                       
RetryingRegistrationConfiguration.defaultConfiguration());
+
+               final JobID jobId = new JobID();
+
+               final SettableLeaderRetrievalService leaderRetrievalService = 
new SettableLeaderRetrievalService();
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServicesBuilder()
+                       .setJobMasterLeaderRetrieverFunction(ignored -> 
leaderRetrievalService)
+                       .build();
+
+               final String jmAddress = "foobar";
+               final TestingJobMasterGateway jobMasterGateway = new 
TestingJobMasterGatewayBuilder().build();
+               
rpcServiceResource.getTestingRpcService().registerGateway(jmAddress, 
jobMasterGateway);
+
+               final TestingJobLeaderListener testingJobLeaderListener = new 
TestingJobLeaderListener();
+               jobLeaderService.start(
+                       "foobar",
+                       rpcServiceResource.getTestingRpcService(),
+                       haServices,
+                       testingJobLeaderListener);
+
+               try {
+                       jobLeaderService.addJob(jobId, jmAddress);
+
+                       leaderRetrievalService.notifyListener(jmAddress, 
UUID.randomUUID());
+
+                       
testingJobLeaderListener.waitUntilJobManagerGainedLeadership();
+
+                       // revoke the leadership
+                       leaderRetrievalService.notifyListener(null, null);
+                       
testingJobLeaderListener.waitUntilJobManagerLostLeadership();
+
+                       jobLeaderService.reconnect(jobId);
+               } finally {
+                       jobLeaderService.stop();
+               }
+       }
+
        private static final class TestingJobLeaderListener implements 
JobLeaderListener {
 
+               private final CountDownLatch jobManagerGainedLeadership = new 
CountDownLatch(1);
+               private final CountDownLatch jobManagerLostLeadershp = new 
CountDownLatch(1);
+
                @Override
                public void jobManagerGainedLeadership(JobID jobId, 
JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess 
registrationMessage) {
-                       // ignored
+                       jobManagerGainedLeadership.countDown();
                }
 
                @Override
                public void jobManagerLostLeadership(JobID jobId, JobMasterId 
jobMasterId) {
-                       // ignored
+                       jobManagerLostLeadershp.countDown();
                }
 
                @Override
                public void handleError(Throwable throwable) {
                        // ignored
                }
+
+               private void waitUntilJobManagerGainedLeadership() throws 
InterruptedException {
+                       jobManagerGainedLeadership.await();
+               }
+
+               private void waitUntilJobManagerLostLeadership() throws 
InterruptedException {
+                       jobManagerLostLeadershp.await();
 
 Review comment:
   ```suggestion
                        jobManagerLostLeadership.await();
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to