XComp commented on code in PR #22380: URL: https://github.com/apache/flink/pull/22380#discussion_r1169638735
########## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java: ########## @@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception { }; } + @Test + void testDelayedGrantCallAfterContenderRegistration() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory)) { + testInstance.startLeaderElectionBackend(); + + final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); + assertThat(driver).isNotNull(); + + final CompletableFuture<Void> grantLeadershipFuture = new CompletableFuture<>(); + driver.isLeader(grantLeadershipFuture); + + final TestingContender contender = new TestingContender("unused-address", testInstance); + testInstance.start(contender); + + assertThat(testInstance.getLeaderSessionID()) + .as("Leadership grant was not forwarded to the contender, yet.") + .isNull(); + + grantLeadershipFuture.complete(null); + + contender.waitForLeader(); + + testInstance.stop(); + } + } + + /** + * Test to cover the issue described in FLINK-31814. This test could be removed after + * FLINK-31814 is resolved. + */ + @Test + void testOnRevokeCallWhileClosingService() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory( + LeaderElectionEventHandler::onRevokeLeadership); + + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory)) { + testInstance.startLeaderElectionBackend(); + + final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); + assertThat(driver).isNotNull(); + + driver.isLeader(); + + final TestingContender contender = new TestingContender("unused-address", testInstance); + testInstance.start(contender); + + contender.waitForLeader(); + + testInstance.stop(); + } + } + + /** + * This issue can happen when the shutdown of the contender takes too long and the leadership is + * re-acquired in the meantime (see FLINK-29234). + */ + @Test + void testStopWhileHavingLeadership() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory)) { + testInstance.startLeaderElectionBackend(); Review Comment: Initially, I thought of putting it in public interface but realized that it's specific to the `DefaultLeaderElectionService` implementation and, therefore, removed it from the `LeaderElectionService` interface. It just didn't cross my mind to call the method in the constructor. But it's a valid point now that's it's out of the interface. I will change it. :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org