kfaraz commented on code in PR #16528:
URL: https://github.com/apache/druid/pull/16528#discussion_r1622740278


##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, 
@Self DruidNode self
     this.curator = curator;
     this.self = self;
     this.latchPath = latchPath;
-
-    // Creating a LeaderLatch here allows us to query for the current leader. 
We will not be considered for leadership
-    // election until LeaderLatch.start() is called in registerListener(). 
This allows clients to observe the current
-    // leader without being involved in the election.
     this.leaderLatch.set(createNewLeaderLatch());
+
+    // Adding ConnectionStateListener to handle session changes using a method 
reference

Review Comment:
   this comment is not needed, the code is self explanatory.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
   private LeaderLatch createNewLeaderLatchWithListener()
   {
     final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+    newLeaderLatch.addListener(new LeaderLatchListener()
+    {
+      @Override
+      public void isLeader()
+      {
+        try {
+          if (leader) {
+            log.warn("I'm being asked to become leader. But I am already the 
leader. Ignored event.");
+            return;
+          }
 
-    newLeaderLatch.addListener(
-        new LeaderLatchListener()
-        {
-          @Override
-          public void isLeader()
-          {
-            try {
-              if (leader) {
-                log.warn("I'm being asked to become leader. But I am already 
the leader. Ignored event.");
-                return;
-              }
-
-              leader = true;
-              term++;
-              listener.becomeLeader();
-            }
-            catch (Exception ex) {
-              log.makeAlert(ex, "listener becomeLeader() failed. Unable to 
become leader").emit();
-
-              // give others a chance to become leader.
-              CloseableUtils.closeAndSuppressExceptions(
-                  createNewLeaderLatchWithListener(),
-                  e -> log.warn("Could not close old leader latch; continuing 
with new one anyway.")
-              );
-
-              leader = false;
-              try {
-                //Small delay before starting the latch so that others waiting 
are chosen to become leader.
-                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
-                leaderLatch.get().start();
-              }
-              catch (Exception e) {
-                // If an exception gets thrown out here, then the node will 
zombie out 'cause it won't be looking for
-                // the latch anymore.  I don't believe it's actually possible 
for an Exception to throw out here, but
-                // Curator likes to have "throws Exception" on methods so it 
might happen...
-                log.makeAlert(e, "I am a zombie").emit();
-              }
-            }
+          leader = true;
+          term++;
+          listener.becomeLeader();
+        }
+        catch (Exception ex) {
+          log.makeAlert(ex, "listener becomeLeader() failed. Unable to become 
leader").emit();
+
+          // give others a chance to become leader.
+          CloseableUtils.closeAndSuppressExceptions(
+              createNewLeaderLatchWithListener(),
+              e -> log.warn("Could not close old leader latch; continuing with 
new one anyway.")
+          );
+
+          leader = false;
+          try {
+            // Small delay before starting the latch so that others waiting 
are chosen to become leader.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
+            leaderLatch.get().start();
+          }
+          catch (Exception e) {
+            // If an exception gets thrown out here, then the node will zombie 
out 'cause it won't be looking for
+            // the latch anymore.  I don't believe it's actually possible for 
an Exception to throw out here, but
+            // Curator likes to have "throws Exception" on methods so it might 
happen...
+            log.makeAlert(e, "I am a zombie").emit();
           }
+        }
+      }
 
-          @Override
-          public void notLeader()
-          {
-            try {
-              if (!leader) {
-                log.warn("I'm being asked to stop being leader. But I am not 
the leader. Ignored event.");
-                return;
-              }
-
-              leader = false;
-              listener.stopBeingLeader();
-            }
-            catch (Exception ex) {
-              log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to 
stopBeingLeader").emit();
-            }
+      @Override
+      public void notLeader()
+      {
+        try {
+          if (!leader) {
+            log.warn("I'm being asked to stop being leader. But I am not the 
leader. Ignored event.");
+            return;
           }
-        },
-        listenerExecutor
-    );
+
+          leader = false;
+          listener.stopBeingLeader();
+        }
+        catch (Exception ex) {
+          log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to 
stopBeingLeader").emit();
+        }
+      }
+    }, listenerExecutor);

Review Comment:
   This seems like only a formatting change, please revert this.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
   private LeaderLatch createNewLeaderLatchWithListener()
   {
     final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+    newLeaderLatch.addListener(new LeaderLatchListener()
+    {
+      @Override
+      public void isLeader()
+      {
+        try {
+          if (leader) {
+            log.warn("I'm being asked to become leader. But I am already the 
leader. Ignored event.");
+            return;
+          }
 
-    newLeaderLatch.addListener(
-        new LeaderLatchListener()
-        {
-          @Override
-          public void isLeader()
-          {
-            try {
-              if (leader) {
-                log.warn("I'm being asked to become leader. But I am already 
the leader. Ignored event.");
-                return;
-              }
-
-              leader = true;
-              term++;
-              listener.becomeLeader();
-            }
-            catch (Exception ex) {
-              log.makeAlert(ex, "listener becomeLeader() failed. Unable to 
become leader").emit();
-
-              // give others a chance to become leader.
-              CloseableUtils.closeAndSuppressExceptions(
-                  createNewLeaderLatchWithListener(),
-                  e -> log.warn("Could not close old leader latch; continuing 
with new one anyway.")
-              );
-
-              leader = false;
-              try {
-                //Small delay before starting the latch so that others waiting 
are chosen to become leader.
-                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
-                leaderLatch.get().start();
-              }
-              catch (Exception e) {
-                // If an exception gets thrown out here, then the node will 
zombie out 'cause it won't be looking for
-                // the latch anymore.  I don't believe it's actually possible 
for an Exception to throw out here, but
-                // Curator likes to have "throws Exception" on methods so it 
might happen...
-                log.makeAlert(e, "I am a zombie").emit();
-              }
-            }
+          leader = true;
+          term++;
+          listener.becomeLeader();
+        }
+        catch (Exception ex) {
+          log.makeAlert(ex, "listener becomeLeader() failed. Unable to become 
leader").emit();
+
+          // give others a chance to become leader.
+          CloseableUtils.closeAndSuppressExceptions(
+              createNewLeaderLatchWithListener(),
+              e -> log.warn("Could not close old leader latch; continuing with 
new one anyway.")
+          );
+
+          leader = false;
+          try {
+            // Small delay before starting the latch so that others waiting 
are chosen to become leader.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
+            leaderLatch.get().start();
+          }
+          catch (Exception e) {
+            // If an exception gets thrown out here, then the node will zombie 
out 'cause it won't be looking for
+            // the latch anymore.  I don't believe it's actually possible for 
an Exception to throw out here, but
+            // Curator likes to have "throws Exception" on methods so it might 
happen...
+            log.makeAlert(e, "I am a zombie").emit();
           }
+        }
+      }
 
-          @Override
-          public void notLeader()
-          {
-            try {
-              if (!leader) {
-                log.warn("I'm being asked to stop being leader. But I am not 
the leader. Ignored event.");
-                return;
-              }
-
-              leader = false;
-              listener.stopBeingLeader();
-            }
-            catch (Exception ex) {
-              log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to 
stopBeingLeader").emit();
-            }
+      @Override
+      public void notLeader()
+      {
+        try {
+          if (!leader) {

Review Comment:
   When connection is lost, the `notLeader()` method is also called. So I 
wonder if we even need to have a state change listener or if we should just 
recreate the latch in this method.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch()
   private LeaderLatch createNewLeaderLatchWithListener()
   {
     final LeaderLatch newLeaderLatch = createNewLeaderLatch();
+    newLeaderLatch.addListener(new LeaderLatchListener()
+    {
+      @Override
+      public void isLeader()
+      {
+        try {
+          if (leader) {

Review Comment:
   We need to handle the case where this method is called with 
`newLeaderLatch.getState() == CLOSED`.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
     CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> 
log.warn(e, "Failed to close LeaderLatch."));
     listenerExecutor.shutdownNow();
   }
+
+  // Method to handle connection state changes
+  private void handleConnectionStateChanged(CuratorFramework client, 
ConnectionState newState)
+  {
+    switch (newState) {
+      case SUSPENDED:
+      case LOST:
+        recreateLeaderLatch();
+        break;
+      case RECONNECTED:
+        // Connection reestablished, no action needed here
+        break;
+      default:
+        // Do nothing for other states
+        break;
+    }
+  }
+
+  private void recreateLeaderLatch()
+  {
+    // Close existing leader latch
+    CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> 
log.warn(e, "Failed to close LeaderLatch."));
+
+    // Create and start a new leader latch
+    LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener();
+    try {
+      newLeaderLatch.start();
+    }
+    catch (Exception ex) {
+      throw new RuntimeException("Failed to start new LeaderLatch after 
session change", ex);
+    }
+    leaderLatch.set(newLeaderLatch);

Review Comment:
   Please take a look at these lines in the code:
   
https://github.com/apache/druid/blob/b53d75758fd5b8d30cbd3836bfa0a954f01ced84/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java#L103-L120
   
   The new method `recreateLeaderLatch()` needs to do exactly this.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
     CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> 
log.warn(e, "Failed to close LeaderLatch."));
     listenerExecutor.shutdownNow();
   }
+
+  // Method to handle connection state changes
+  private void handleConnectionStateChanged(CuratorFramework client, 
ConnectionState newState)
+  {
+    switch (newState) {
+      case SUSPENDED:
+      case LOST:
+        recreateLeaderLatch();
+        break;
+      case RECONNECTED:
+        // Connection reestablished, no action needed here
+        break;
+      default:
+        // Do nothing for other states
+        break;
+    }
+  }
+
+  private void recreateLeaderLatch()
+  {
+    // Close existing leader latch
+    CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> 
log.warn(e, "Failed to close LeaderLatch."));
+
+    // Create and start a new leader latch
+    LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener();

Review Comment:
   The latch returned by `createNewLeaderLatchWithListener()` is actually the 
old latch.



##########
server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java:
##########
@@ -215,4 +208,37 @@ public void unregisterListener()
     CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> 
log.warn(e, "Failed to close LeaderLatch."));
     listenerExecutor.shutdownNow();
   }
+
+  // Method to handle connection state changes

Review Comment:
   ```suggestion
     /**
      * Handles connection state changes. Recreates the leader latch if 
connection to zookeeper is lost.
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to