kfaraz commented on code in PR #16528: URL: https://github.com/apache/druid/pull/16528#discussion_r1622740278
########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.curator = curator; this.self = self; this.latchPath = latchPath; - - // Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership - // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current - // leader without being involved in the election. this.leaderLatch.set(createNewLeaderLatch()); + + // Adding ConnectionStateListener to handle session changes using a method reference Review Comment: this comment is not needed, the code is self explanatory. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); + newLeaderLatch.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() - { - try { - if (leader) { - log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); - return; - } - - leader = true; - term++; - listener.becomeLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); - - leader = false; - try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); - } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); - } - } + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + + leader = false; + try { + // Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); } + } + } - @Override - public void notLeader() - { - try { - if (!leader) { - log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); - return; - } - - leader = false; - listener.stopBeingLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); - } + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; } - }, - listenerExecutor - ); + + leader = false; + listener.stopBeingLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } + } + }, listenerExecutor); Review Comment: This seems like only a formatting change, please revert this. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); + newLeaderLatch.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() - { - try { - if (leader) { - log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); - return; - } - - leader = true; - term++; - listener.becomeLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); - - leader = false; - try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); - } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); - } - } + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + + leader = false; + try { + // Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); } + } + } - @Override - public void notLeader() - { - try { - if (!leader) { - log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); - return; - } - - leader = false; - listener.stopBeingLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); - } + @Override + public void notLeader() + { + try { + if (!leader) { Review Comment: When connection is lost, the `notLeader()` method is also called. So I wonder if we even need to have a state change listener or if we should just recreate the latch in this method. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); + newLeaderLatch.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { Review Comment: We need to handle the case where this method is called with `newLeaderLatch.getState() == CLOSED`. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -215,4 +208,37 @@ public void unregisterListener() CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } + + // Method to handle connection state changes + private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) + { + switch (newState) { + case SUSPENDED: + case LOST: + recreateLeaderLatch(); + break; + case RECONNECTED: + // Connection reestablished, no action needed here + break; + default: + // Do nothing for other states + break; + } + } + + private void recreateLeaderLatch() + { + // Close existing leader latch + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); + + // Create and start a new leader latch + LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); + try { + newLeaderLatch.start(); + } + catch (Exception ex) { + throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); + } + leaderLatch.set(newLeaderLatch); Review Comment: Please take a look at these lines in the code: https://github.com/apache/druid/blob/b53d75758fd5b8d30cbd3836bfa0a954f01ced84/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java#L103-L120 The new method `recreateLeaderLatch()` needs to do exactly this. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -215,4 +208,37 @@ public void unregisterListener() CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } + + // Method to handle connection state changes + private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) + { + switch (newState) { + case SUSPENDED: + case LOST: + recreateLeaderLatch(); + break; + case RECONNECTED: + // Connection reestablished, no action needed here + break; + default: + // Do nothing for other states + break; + } + } + + private void recreateLeaderLatch() + { + // Close existing leader latch + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); + + // Create and start a new leader latch + LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); Review Comment: The latch returned by `createNewLeaderLatchWithListener()` is actually the old latch. ########## server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java: ########## @@ -215,4 +208,37 @@ public void unregisterListener() CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } + + // Method to handle connection state changes Review Comment: ```suggestion /** * Handles connection state changes. Recreates the leader latch if connection to zookeeper is lost. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org